#include "folly/wangle/bootstrap/ServerBootstrap.h"
#include "folly/wangle/bootstrap/ClientBootstrap.h"
-#include "folly/wangle/channel/ChannelHandler.h"
+#include "folly/wangle/channel/Handler.h"
#include <glog/logging.h>
#include <gtest/gtest.h>
using namespace folly::wangle;
using namespace folly;
-typedef ChannelPipeline<IOBufQueue&, std::unique_ptr<IOBuf>> Pipeline;
+typedef Pipeline<IOBufQueue&, std::unique_ptr<IOBuf>> BytesPipeline;
-typedef ServerBootstrap<Pipeline> TestServer;
-typedef ClientBootstrap<Pipeline> TestClient;
+typedef ServerBootstrap<BytesPipeline> TestServer;
+typedef ClientBootstrap<BytesPipeline> TestClient;
-class TestClientPipelineFactory : public PipelineFactory<Pipeline> {
+class TestClientPipelineFactory : public PipelineFactory<BytesPipeline> {
public:
- Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+ BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
CHECK(sock->good());
// We probably aren't connected immedately, check after a small delay
}
};
-class TestPipelineFactory : public PipelineFactory<Pipeline> {
+class TestPipelineFactory : public PipelineFactory<BytesPipeline> {
public:
- Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+ BytesPipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
pipelines++;
- return new Pipeline();
+ return new BytesPipeline();
}
std::atomic<int> pipelines{0};
};
+class TestAcceptor : public Acceptor {
+EventBase base_;
+ public:
+ TestAcceptor() : Acceptor(ServerSocketConfig()) {
+ Acceptor::init(nullptr, &base_);
+ }
+ void onNewConnection(
+ AsyncSocket::UniquePtr sock,
+ const folly::SocketAddress* address,
+ const std::string& nextProtocolName,
+ const TransportInfo& tinfo) {
+ }
+};
+
+class TestAcceptorFactory : public AcceptorFactory {
+ public:
+ std::shared_ptr<Acceptor> newAcceptor(EventBase* base) {
+ return std::make_shared<TestAcceptor>();
+ }
+};
+
TEST(Bootstrap, Basic) {
TestServer server;
TestClient client;
server.stop();
}
+TEST(Bootstrap, ServerWithChildHandler) {
+ TestServer server;
+ server.childHandler(std::make_shared<TestAcceptorFactory>());
+ server.bind(0);
+ server.stop();
+}
+
TEST(Bootstrap, ClientServerTest) {
TestServer server;
auto factory = std::make_shared<TestPipelineFactory>();
folly::AsyncServerSocket::UniquePtr socket(new AsyncServerSocket);
server.bind(std::move(socket));
}
+
+std::atomic<int> connections{0};
+
+class TestHandlerPipeline
+ : public HandlerAdapter<void*,
+ std::exception> {
+ public:
+ void read(Context* ctx, void* conn) {
+ connections++;
+ return ctx->fireRead(conn);
+ }
+
+ Future<void> write(Context* ctx, std::exception e) {
+ return ctx->fireWrite(e);
+ }
+};
+
+template <typename HandlerPipeline>
+class TestHandlerPipelineFactory
+ : public PipelineFactory<ServerBootstrap<BytesPipeline>::AcceptPipeline> {
+ public:
+ ServerBootstrap<BytesPipeline>::AcceptPipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
+ auto pipeline = new ServerBootstrap<BytesPipeline>::AcceptPipeline;
+ auto handler = std::make_shared<HandlerPipeline>();
+ pipeline->addBack(HandlerPtr<HandlerPipeline>(handler));
+ return pipeline;
+ }
+};
+
+TEST(Bootstrap, LoadBalanceHandler) {
+ TestServer server;
+ auto factory = std::make_shared<TestPipelineFactory>();
+ server.childPipeline(factory);
+
+ auto pipelinefactory =
+ std::make_shared<TestHandlerPipelineFactory<TestHandlerPipeline>>();
+ server.pipeline(pipelinefactory);
+ server.bind(0);
+ auto base = EventBaseManager::get()->getEventBase();
+
+ SocketAddress address;
+ server.getSockets()[0]->getAddress(&address);
+
+ TestClient client;
+ client.pipelineFactory(std::make_shared<TestClientPipelineFactory>());
+ client.connect(address);
+ base->loop();
+ server.stop();
+
+ CHECK(factory->pipelines == 1);
+ CHECK(connections == 1);
+}
+
+class TestUDPPipeline
+ : public HandlerAdapter<void*,
+ std::exception> {
+ public:
+ void read(Context* ctx, void* conn) {
+ connections++;
+ }
+
+ Future<void> write(Context* ctx, std::exception e) {
+ return ctx->fireWrite(e);
+ }
+};
+
+TEST(Bootstrap, UDP) {
+ TestServer server;
+ auto factory = std::make_shared<TestPipelineFactory>();
+ auto pipelinefactory =
+ std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
+ server.pipeline(pipelinefactory);
+ server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
+ server.bind(0);
+}
+
+TEST(Bootstrap, UDPClientServerTest) {
+ connections = 0;
+
+ TestServer server;
+ auto factory = std::make_shared<TestPipelineFactory>();
+ auto pipelinefactory =
+ std::make_shared<TestHandlerPipelineFactory<TestUDPPipeline>>();
+ server.pipeline(pipelinefactory);
+ server.channelFactory(std::make_shared<AsyncUDPServerSocketFactory>());
+ server.bind(0);
+
+ auto base = EventBaseManager::get()->getEventBase();
+
+ SocketAddress address;
+ server.getSockets()[0]->getAddress(&address);
+
+ SocketAddress localhost("::1", 0);
+ AsyncUDPSocket client(base);
+ client.bind(localhost);
+ auto data = IOBuf::create(1);
+ data->append(1);
+ *(data->writableData()) = 'a';
+ client.write(address, std::move(data));
+ base->loop();
+ server.stop();
+
+ CHECK(connections == 1);
+}