2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/wangle/channel/Pipeline.h>
19 #include <folly/wangle/concurrent/IOThreadPoolExecutor.h>
20 #include <folly/io/async/AsyncSocket.h>
21 #include <folly/io/async/EventBaseManager.h>
26 * A thin wrapper around Pipeline and AsyncSocket to match
27 * ServerBootstrap. On connect() a new pipeline is created.
29 template <typename Pipeline>
30 class ClientBootstrap {
32 class ConnectCallback : public AsyncSocket::ConnectCallback {
34 ConnectCallback(Promise<Pipeline*> promise, ClientBootstrap* bootstrap)
35 : promise_(std::move(promise))
36 , bootstrap_(bootstrap) {}
38 void connectSuccess() noexcept override {
39 if (bootstrap_->getPipeline()) {
40 bootstrap_->getPipeline()->transportActive();
42 promise_.setValue(bootstrap_->getPipeline());
46 void connectErr(const AsyncSocketException& ex) noexcept override {
47 promise_.setException(
48 folly::make_exception_wrapper<AsyncSocketException>(ex));
52 Promise<Pipeline*> promise_;
53 ClientBootstrap* bootstrap_;
60 ClientBootstrap* group(
61 std::shared_ptr<folly::wangle::IOThreadPoolExecutor> group) {
65 ClientBootstrap* bind(int port) {
69 Future<Pipeline*> connect(SocketAddress address) {
70 DCHECK(pipelineFactory_);
71 auto base = EventBaseManager::get()->getEventBase();
73 base = group_->getEventBase();
75 Future<Pipeline*> retval((Pipeline*)nullptr);
76 base->runImmediatelyOrRunInEventBaseThreadAndWait([&](){
77 auto socket = AsyncSocket::newSocket(base);
78 Promise<Pipeline*> promise;
79 retval = promise.getFuture();
81 new ConnectCallback(std::move(promise), this), address);
82 pipeline_ = pipelineFactory_->newPipeline(socket);
87 ClientBootstrap* pipelineFactory(
88 std::shared_ptr<PipelineFactory<Pipeline>> factory) {
89 pipelineFactory_ = factory;
93 Pipeline* getPipeline() {
94 return pipeline_.get();
97 virtual ~ClientBootstrap() {}
100 std::unique_ptr<Pipeline,
101 folly::DelayedDestruction::Destructor> pipeline_;
105 std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
106 std::shared_ptr<folly::wangle::IOThreadPoolExecutor> group_;