From c152d43dedf865196eb47c799c811ebcf4927188 Mon Sep 17 00:00:00 2001 From: Dave Watson Date: Fri, 23 Jan 2015 09:50:35 -0800 Subject: [PATCH] Finagle interfaces Summary: Future service interfaces similar to finagle. Service creators for client, filters Test Plan: Unittests included - also sets up a simple pipeline to test a full stack client/server. Reviewed By: hans@fb.com Subscribers: jsedgwick, trunkagent, njormrod, folly-diffs@, doug, fugalh FB internal diff: D1573086 Tasks: 5002456 Signature: t1:1573086:1421970698:328453c4a980bb6950fc9aeed6a2b6d9819c20db --- folly/wangle/bootstrap/BootstrapTest.cpp | 22 ++- folly/wangle/bootstrap/ClientBootstrap.h | 15 +- folly/wangle/service/ClientDispatcher.h | 63 +++++++ folly/wangle/service/ServerDispatcher.h | 45 +++++ folly/wangle/service/Service.h | 110 ++++++++++++ folly/wangle/service/ServiceTest.cpp | 220 +++++++++++++++++++++++ 6 files changed, 467 insertions(+), 8 deletions(-) create mode 100644 folly/wangle/service/ClientDispatcher.h create mode 100644 folly/wangle/service/ServerDispatcher.h create mode 100644 folly/wangle/service/Service.h create mode 100644 folly/wangle/service/ServiceTest.cpp diff --git a/folly/wangle/bootstrap/BootstrapTest.cpp b/folly/wangle/bootstrap/BootstrapTest.cpp index 1567978b..9f2f664e 100644 --- a/folly/wangle/bootstrap/BootstrapTest.cpp +++ b/folly/wangle/bootstrap/BootstrapTest.cpp @@ -26,13 +26,11 @@ using namespace folly; typedef ChannelPipeline> Pipeline; -class TestServer : public ServerBootstrap { - Pipeline* newPipeline(std::shared_ptr) { - return nullptr; - } -}; +typedef ServerBootstrap TestServer; +typedef ClientBootstrap TestClient; -class TestClient : public ClientBootstrap { +class TestClientPipelineFactory : public PipelineFactory { + public: Pipeline* newPipeline(std::shared_ptr sock) { CHECK(sock->good()); @@ -76,6 +74,7 @@ TEST(Bootstrap, ClientServerTest) { server.getSockets()[0]->getAddress(&address); TestClient client; + client.pipelineFactory(std::make_shared()); client.connect(address); base->loop(); server.stop(); @@ -98,9 +97,12 @@ TEST(Bootstrap, ClientConnectionManagerTest) { server.getSockets()[0]->getAddress(&address); TestClient client; + client.pipelineFactory(std::make_shared()); + client.connect(address); TestClient client2; + client2.pipelineFactory(std::make_shared()); client2.connect(address); base->loop(); @@ -124,6 +126,7 @@ TEST(Bootstrap, ServerAcceptGroupTest) { boost::barrier barrier(2); auto thread = std::thread([&](){ TestClient client; + client.pipelineFactory(std::make_shared()); client.connect(address); EventBaseManager::get()->getEventBase()->loop(); barrier.wait(); @@ -162,6 +165,8 @@ TEST(Bootstrap, ServerAcceptGroup2Test) { server.getSockets()[0]->getAddress(&address); TestClient client; + client.pipelineFactory(std::make_shared()); + client.connect(address); EventBaseManager::get()->getEventBase()->loop(); @@ -198,18 +203,23 @@ TEST(Bootstrap, SharedThreadPool) { server.getSockets()[0]->getAddress(&address); TestClient client; + client.pipelineFactory(std::make_shared()); client.connect(address); TestClient client2; + client2.pipelineFactory(std::make_shared()); client2.connect(address); TestClient client3; + client3.pipelineFactory(std::make_shared()); client3.connect(address); TestClient client4; + client4.pipelineFactory(std::make_shared()); client4.connect(address); TestClient client5; + client5.pipelineFactory(std::make_shared()); client5.connect(address); EventBaseManager::get()->getEventBase()->loop(); diff --git a/folly/wangle/bootstrap/ClientBootstrap.h b/folly/wangle/bootstrap/ClientBootstrap.h index 8ee8fad9..40105195 100644 --- a/folly/wangle/bootstrap/ClientBootstrap.h +++ b/folly/wangle/bootstrap/ClientBootstrap.h @@ -33,13 +33,24 @@ class ClientBootstrap { return this; } ClientBootstrap* connect(SocketAddress address) { + DCHECK(pipelineFactory_); pipeline_.reset( - newPipeline( + pipelineFactory_->newPipeline( AsyncSocket::newSocket(EventBaseManager::get()->getEventBase(), address) )); return this; } + ClientBootstrap* pipelineFactory( + std::shared_ptr> factory) { + pipelineFactory_ = factory; + return this; + } + + Pipeline* getPipeline() { + return pipeline_.get(); + } + virtual ~ClientBootstrap() {} protected: @@ -48,7 +59,7 @@ class ClientBootstrap { int port_; - virtual Pipeline* newPipeline(std::shared_ptr socket) = 0; + std::shared_ptr> pipelineFactory_; }; } // namespace diff --git a/folly/wangle/service/ClientDispatcher.h b/folly/wangle/service/ClientDispatcher.h new file mode 100644 index 00000000..a42a74f4 --- /dev/null +++ b/folly/wangle/service/ClientDispatcher.h @@ -0,0 +1,63 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace folly { namespace wangle { + +/** + * Dispatch a request, satisfying Promise `p` with the response; + * the returned Future is satisfied when the response is received: + * only one request is allowed at a time. + */ +template +class SerialClientDispatcher : public ChannelHandlerAdapter + , public Service { + public: + + typedef typename ChannelHandlerAdapter::Context Context; + + void setPipeline(Pipeline* pipeline) { + pipeline_ = pipeline; + pipeline->addBack( + ChannelHandlerPtr, false>( + this)); + pipeline->finalize(); + } + + void read(Context* ctx, Req in) override { + DCHECK(p_); + p_->setValue(std::move(in)); + p_ = none; + } + + virtual Future operator()(Req arg) override { + CHECK(!p_); + DCHECK(pipeline_); + + p_ = Promise(); + auto f = p_->getFuture(); + pipeline_->write(std::move(arg)); + return f; + } + + private: + Pipeline* pipeline_{nullptr}; + folly::Optional> p_; +}; + +}} // namespace diff --git a/folly/wangle/service/ServerDispatcher.h b/folly/wangle/service/ServerDispatcher.h new file mode 100644 index 00000000..7ec3a4dc --- /dev/null +++ b/folly/wangle/service/ServerDispatcher.h @@ -0,0 +1,45 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include + +namespace folly { namespace wangle { + +/** + * Dispatch requests from pipeline one at a time synchronously. + * Concurrent requests are queued in the pipeline. + */ +template +class SerialServerDispatcher : public ChannelHandlerAdapter { + public: + + typedef typename ChannelHandlerAdapter::Context Context; + + explicit SerialServerDispatcher(Service* service) + : service_(service) {} + + void read(Context* ctx, Req in) override { + auto resp = (*service_)(std::move(in)).get(); + ctx->fireWrite(std::move(resp)); + } + + private: + + Service* service_; +}; + +}} // namespace diff --git a/folly/wangle/service/Service.h b/folly/wangle/service/Service.h new file mode 100644 index 00000000..7d78defd --- /dev/null +++ b/folly/wangle/service/Service.h @@ -0,0 +1,110 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#pragma once + +#include +#include + +#include +#include +#include +#include + +namespace folly { + +/** + * A Service is an asynchronous function from Request to + * Future. It is the basic unit of the RPC interface. + */ +template +class Service { + public: + virtual Future operator()(Req request) = 0; + virtual ~Service() {} +}; + +/** + * A Filter acts as a decorator/transformer of a service. It may apply + * transformations to the input and output of that service: + * + * class MyService + * + * ReqA -> | + * | -> ReqB + * | <- RespB + * RespA <- | + * + * For example, you may have a service that takes Strings and parses + * them as Ints. If you want to expose this as a Network Service via + * Thrift, it is nice to isolate the protocol handling from the + * business rules. Hence you might have a Filter that converts back + * and forth between Thrift structs: + * + * [ThriftIn -> (String -> Int) -> ThriftOut] + */ +template +class Filter { + public: + virtual Future operator()( + Service* service, ReqA request) = 0; + std::unique_ptr> compose( + Service* service); + virtual ~Filter() {} +}; + +template +class ComposedService : public Service { + public: + ComposedService(Service* service, + Filter* filter) + : service_(service) + , filter_(filter) {} + virtual Future operator()(ReqA request) override { + return (*filter_)(service_, request); + } + + ~ComposedService(){} + private: + Service* service_; + Filter* filter_; +}; + +template + std::unique_ptr> + Filter::compose(Service* service) { + return folly::make_unique>( + service, this); +} + +/** + * A factory that creates services, given a client. This lets you + * make RPC calls on the Service interface over a client's pipeline. + * + * Clients can be reused after you are done using the service. + */ +template +class ServiceFactory { + public: + virtual Future*> operator()( + ClientBootstrap* client) = 0; + + virtual ~ServiceFactory() = default; +}; + +} // namespace diff --git a/folly/wangle/service/ServiceTest.cpp b/folly/wangle/service/ServiceTest.cpp new file mode 100644 index 00000000..d54ac9ec --- /dev/null +++ b/folly/wangle/service/ServiceTest.cpp @@ -0,0 +1,220 @@ +/* + * Copyright 2015 Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +#include + +#include +#include +#include + +namespace folly { + +using namespace wangle; + +typedef ChannelPipeline Pipeline; + +class EchoService : public Service { + public: + virtual Future operator()(std::string req) override { + return makeFuture(std::move(req)); + } +}; + +class EchoIntService : public Service { + public: + virtual Future operator()(std::string req) override { + return makeFuture(folly::to(req)); + } +}; + +class StringCodec : public ChannelHandler> { + public: + typedef typename ChannelHandler< + IOBufQueue&, std::string, + std::string, std::unique_ptr>::Context Context; + + void read(Context* ctx, IOBufQueue& q) override { + auto buf = q.pop_front(); + buf->coalesce(); + std::string data((const char*)buf->data(), buf->length()); + + ctx->fireRead(data); + } + + Future write(Context* ctx, std::string msg) override { + auto buf = IOBuf::copyBuffer(msg.data(), msg.length()); + return ctx->fireWrite(std::move(buf)); + } +}; + +template +class ServerPipelineFactory + : public PipelineFactory { + public: + + Pipeline* newPipeline( + std::shared_ptr socket) override { + auto pipeline = new Pipeline(); + pipeline->addBack(AsyncSocketHandler(socket)); + pipeline->addBack(StringCodec()); + pipeline->addBack(SerialServerDispatcher(&service_)); + pipeline->finalize(); + pipeline->template getHandler(0)->attachReadCallback(); + return pipeline; + } + + private: + EchoService service_; +}; + +template +class ClientPipelineFactory : public PipelineFactory { + public: + + Pipeline* newPipeline( + std::shared_ptr socket) override { + auto pipeline = new Pipeline(); + pipeline->addBack(AsyncSocketHandler(socket)); + pipeline->addBack(StringCodec()); + pipeline->template getHandler(0)->attachReadCallback(); + + return pipeline; + } +}; + +template +class ClientServiceFactory : public ServiceFactory { + public: + class ClientService : public Service { + public: + explicit ClientService(Pipeline* pipeline) { + dispatcher_.setPipeline(pipeline); + } + Future operator()(Req request) override { + return dispatcher_(std::move(request)); + } + private: + SerialClientDispatcher dispatcher_; + }; + + Future*> operator()( + ClientBootstrap* client) override { + return makeFuture*>( + new ClientService(client->getPipeline())); + } +}; + +TEST(Wangle, ClientServerTest) { + int port = 1234; + // server + + ServerBootstrap server; + server.childPipeline( + std::make_shared>()); + server.bind(port); + + // client + ClientBootstrap client; + ClientServiceFactory serviceFactory; + client.pipelineFactory( + std::make_shared>()); + SocketAddress addr("127.0.0.1", port); + client.connect(addr); + auto service = serviceFactory(&client).value(); + auto rep = (*service)("test"); + + rep.then([&](std::string value) { + EXPECT_EQ("test", value); + EventBaseManager::get()->getEventBase()->terminateLoopSoon(); + + }); + EventBaseManager::get()->getEventBase()->loopForever(); + server.stop(); +} + +class AppendFilter : public Filter { + public: + virtual Future operator()( + Service* service, std::string req) { + return (*service)(req + "\n"); + } +}; + +class IntToStringFilter : public Filter { + public: + virtual Future operator()( + Service* service, int req) { + return (*service)(folly::to(req)).then([](std::string resp) { + return folly::to(resp); + }); + } +}; + +TEST(Wangle, FilterTest) { + auto service = folly::make_unique(); + auto filter = folly::make_unique(); + auto result = (*filter)(service.get(), "test"); + EXPECT_EQ(result.value(), "test\n"); + + // Check composition + auto composed_service = filter->compose(service.get()); + auto result2 = (*composed_service)("test"); + EXPECT_EQ(result2.value(), "test\n"); +} + +TEST(Wangle, ComplexFilterTest) { + auto service = folly::make_unique(); + auto filter = folly::make_unique(); + auto result = (*filter)(service.get(), 1); + EXPECT_EQ(result.value(), 1); + + // Check composition + auto composed_service = filter->compose(service.get()); + auto result2 = (*composed_service)(2); + EXPECT_EQ(result2.value(), 2); +} + +class ChangeTypeFilter : public Filter { + public: + virtual Future operator()( + Service* service, int req) { + return (*service)(folly::to(req)).then([](int resp) { + return folly::to(resp); + }); + } +}; + +TEST(Wangle, SuperComplexFilterTest) { + auto service = folly::make_unique(); + auto filter = folly::make_unique(); + auto result = (*filter)(service.get(), 1); + EXPECT_EQ(result.value(), "1"); + + // Check composition + auto composed_service = filter->compose(service.get()); + auto result2 = (*composed_service)(2); + EXPECT_EQ(result2.value(), "2"); +} + +int main(int argc, char** argv) { + testing::InitGoogleTest(&argc, argv); + google::InitGoogleLogging(argv[0]); + google::ParseCommandLineFlags(&argc, &argv, true); + + return RUN_ALL_TESTS(); +} + +} // namespace -- 2.34.1