server bootstrap
authorDave Watson <davejwatson@fb.com>
Tue, 28 Oct 2014 21:46:19 +0000 (14:46 -0700)
committerDave Watson <davejwatson@fb.com>
Wed, 19 Nov 2014 20:53:35 +0000 (12:53 -0800)
Summary:
ServerBootstrap a la netty.

This should be enough for some refactoring of thrift server and proxygen servers - but there are still lots of TODOs left to do

Test Plan:
Unittests included

Depends on D1638358

Reviewed By: jsedgwick@fb.com

Subscribers: trunkagent, doug, fugalh, alandau, bmatheny, mshneer, jsedgwick, afrind, dcsommer

FB internal diff: D1649521

Tasks: 5488516

Signature: t1:1649521:1416256073:fc003fd471bdfd137160dd6d7befd933ee8addd2

folly/experimental/wangle/acceptor/Acceptor.cpp
folly/experimental/wangle/acceptor/Acceptor.h
folly/experimental/wangle/bootstrap/BootstrapTest.cpp [new file with mode: 0644]
folly/experimental/wangle/bootstrap/ClientBootstrap.h [new file with mode: 0644]
folly/experimental/wangle/bootstrap/ServerBootstrap-inl.h [new file with mode: 0644]
folly/experimental/wangle/bootstrap/ServerBootstrap.cpp [new file with mode: 0644]
folly/experimental/wangle/bootstrap/ServerBootstrap.h [new file with mode: 0644]
folly/experimental/wangle/channel/ChannelPipeline.h
folly/experimental/wangle/concurrent/ThreadPoolExecutor.h

index 534e6f4f07568988977249779d61d76e14134fdf..bd9c67f19c06cd35042b3db80204f53b15f92903 100644 (file)
@@ -188,14 +188,16 @@ Acceptor::init(AsyncServerSocket* serverSocket,
   downstreamConnectionManager_ = ConnectionManager::makeUnique(
     eventBase, accConfig_.connectionIdleTimeout, this);
 
-  serverSocket->addAcceptCallback(this, eventBase);
-  // SO_KEEPALIVE is the only setting that is inherited by accepted
-  // connections so only apply this setting
-  for (const auto& option: socketOptions_) {
-    if (option.first.level == SOL_SOCKET &&
-        option.first.optname == SO_KEEPALIVE && option.second == 1) {
-      serverSocket->setKeepAliveEnabled(true);
-      break;
+  if (serverSocket) {
+    serverSocket->addAcceptCallback(this, eventBase);
+    // SO_KEEPALIVE is the only setting that is inherited by accepted
+    // connections so only apply this setting
+    for (const auto& option: socketOptions_) {
+      if (option.first.level == SOL_SOCKET &&
+          option.first.optname == SO_KEEPALIVE && option.second == 1) {
+        serverSocket->setKeepAliveEnabled(true);
+        break;
+      }
     }
   }
 }
index 69597018cc304b99791e8867fdafade4544104c3..404425e70976adccdf5478bc1ad858d06ed8d2ad 100644 (file)
@@ -98,7 +98,7 @@ class Acceptor :
   /**
    * Access the Acceptor's event base.
    */
-  EventBase* getEventBase() { return base_; }
+  virtual EventBase* getEventBase() const { return base_; }
 
   /**
    * Access the Acceptor's downstream (client-side) ConnectionManager
@@ -173,6 +173,12 @@ class Acceptor :
     std::chrono::steady_clock::time_point acceptTime
   ) noexcept;
 
+  /**
+   * Drains all open connections of their outstanding transactions. When
+   * a connection's transaction count reaches zero, the connection closes.
+   */
+  void drainAllConnections();
+
  protected:
   friend class AcceptorHandshakeHelper;
 
@@ -239,11 +245,7 @@ class Acceptor :
    */
   void dropAllConnections();
 
-  /**
-   * Drains all open connections of their outstanding transactions. When
-   * a connection's transaction count reaches zero, the connection closes.
-   */
-  void drainAllConnections();
+ protected:
 
   /**
    * onConnectionsDrained() will be called once all connections have been
@@ -335,4 +337,10 @@ class Acceptor :
   std::shared_ptr<SSLCacheProvider> cacheProvider_;
 };
 
+class AcceptorFactory {
+ public:
+  virtual std::shared_ptr<Acceptor> newAcceptor() = 0;
+  virtual ~AcceptorFactory() = default;
+};
+
 } // namespace
diff --git a/folly/experimental/wangle/bootstrap/BootstrapTest.cpp b/folly/experimental/wangle/bootstrap/BootstrapTest.cpp
new file mode 100644 (file)
index 0000000..b8461b4
--- /dev/null
@@ -0,0 +1,120 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "folly/experimental/wangle/bootstrap/ServerBootstrap.h"
+#include "folly/experimental/wangle/bootstrap/ClientBootstrap.h"
+#include "folly/experimental/wangle/channel/ChannelHandler.h"
+
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+using namespace folly::wangle;
+using namespace folly;
+
+typedef ChannelHandlerAdapter<IOBuf> BytesPassthrough;
+typedef ChannelPipeline<BytesPassthrough> Pipeline;
+
+class TestServer : public ServerBootstrap<Pipeline> {
+  Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) {
+    return nullptr;
+  }
+};
+
+class TestClient : public ClientBootstrap<Pipeline> {
+  Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+    CHECK(sock->good());
+
+    // We probably aren't connected immedately, check after a small delay
+    EventBaseManager::get()->getEventBase()->runAfterDelay([sock](){
+      CHECK(sock->readable());
+    }, 100);
+    return nullptr;
+  }
+};
+
+class TestPipelineFactory : public PipelineFactory<Pipeline> {
+ public:
+  Pipeline* newPipeline(std::shared_ptr<AsyncSocket> sock) {
+    pipelines++;
+    return new Pipeline(BytesPassthrough());
+  }
+  std::atomic<int> pipelines{0};
+};
+
+TEST(Bootstrap, Basic) {
+  TestServer server;
+  TestClient client;
+}
+
+TEST(Bootstrap, ServerWithPipeline) {
+  TestServer server;
+  server.childPipeline(std::make_shared<TestPipelineFactory>());
+  server.bind(0);
+  auto base = EventBaseManager::get()->getEventBase();
+  base->runAfterDelay([&](){
+    server.stop();
+  }, 500);
+  base->loop();
+}
+
+TEST(Bootstrap, ClientServerTest) {
+  TestServer server;
+  auto factory = std::make_shared<TestPipelineFactory>();
+  server.childPipeline(factory);
+  server.bind(0);
+  auto base = EventBaseManager::get()->getEventBase();
+
+  SocketAddress address;
+  server.getSockets()[0]->getAddress(&address);
+
+  TestClient client;
+  client.connect(address);
+  base->runAfterDelay([&](){
+    server.stop();
+  }, 500);
+  base->loop();
+
+  CHECK(factory->pipelines == 1);
+}
+
+TEST(Bootstrap, ClientConnectionManagerTest) {
+  // Create a single IO thread, and verify that
+  // client connections are pooled properly
+
+  TestServer server;
+  auto factory = std::make_shared<TestPipelineFactory>();
+  server.childPipeline(factory);
+  server.group(std::make_shared<IOThreadPoolExecutor>(1));
+  server.bind(0);
+  auto base = EventBaseManager::get()->getEventBase();
+
+  SocketAddress address;
+  server.getSockets()[0]->getAddress(&address);
+
+  TestClient client;
+  client.connect(address);
+
+  TestClient client2;
+  client2.connect(address);
+
+  base->runAfterDelay([&](){
+    server.stop();
+  }, 500);
+
+  base->loop();
+
+  CHECK(factory->pipelines == 2);
+}
diff --git a/folly/experimental/wangle/bootstrap/ClientBootstrap.h b/folly/experimental/wangle/bootstrap/ClientBootstrap.h
new file mode 100644 (file)
index 0000000..dadbf5c
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/wangle/channel/ChannelPipeline.h>
+
+namespace folly {
+
+/*
+ * A thin wrapper around ChannelPipeline and AsyncSocket to match
+ * ServerBootstrap.  On connect() a new pipeline is created.
+ */
+template <typename Pipeline>
+class ClientBootstrap {
+ public:
+  ClientBootstrap() {
+  }
+  ClientBootstrap* bind(int port) {
+    port_ = port;
+    return this;
+  }
+  ClientBootstrap* connect(SocketAddress address) {
+    pipeline_.reset(
+      newPipeline(
+        AsyncSocket::newSocket(EventBaseManager::get()->getEventBase(), address)
+      ));
+    return this;
+  }
+
+  virtual ~ClientBootstrap() {}
+
+ protected:
+  std::unique_ptr<Pipeline,
+                  folly::DelayedDestruction::Destructor> pipeline_;
+
+  int port_;
+
+  virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket> socket) = 0;
+};
+
+} // namespace
diff --git a/folly/experimental/wangle/bootstrap/ServerBootstrap-inl.h b/folly/experimental/wangle/bootstrap/ServerBootstrap-inl.h
new file mode 100644 (file)
index 0000000..7268e2a
--- /dev/null
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/wangle/acceptor/Acceptor.h>
+#include <folly/io/async/EventBaseManager.h>
+#include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
+#include <folly/experimental/wangle/ManagedConnection.h>
+#include <folly/experimental/wangle/channel/ChannelPipeline.h>
+
+namespace folly {
+
+template <typename Pipeline>
+class ServerAcceptor : public Acceptor {
+  typedef std::unique_ptr<Pipeline,
+                          folly::DelayedDestruction::Destructor> PipelinePtr;
+
+  class ServerConnection : public wangle::ManagedConnection {
+   public:
+    explicit ServerConnection(PipelinePtr pipeline)
+        : pipeline_(std::move(pipeline)) {}
+
+    ~ServerConnection() {
+    }
+
+    void timeoutExpired() noexcept {
+    }
+
+    void describe(std::ostream& os) const {}
+    bool isBusy() const {
+      return false;
+    }
+    void notifyPendingShutdown() {}
+    void closeWhenIdle() {}
+    void dropConnection() {}
+    void dumpConnectionState(uint8_t loglevel) {}
+   private:
+    PipelinePtr pipeline_;
+  };
+
+ public:
+  explicit ServerAcceptor(
+    std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory)
+      : Acceptor(ServerSocketConfig())
+      , pipelineFactory_(pipelineFactory) {
+    Acceptor::init(nullptr, &base_);
+  }
+
+  /* See Acceptor::onNewConnection for details */
+  void onNewConnection(
+    AsyncSocket::UniquePtr transport, const SocketAddress* address,
+    const std::string& nextProtocolName, const TransportInfo& tinfo) {
+
+      std::unique_ptr<Pipeline,
+                       folly::DelayedDestruction::Destructor>
+      pipeline(pipelineFactory_->newPipeline(
+        std::shared_ptr<AsyncSocket>(
+          transport.release(),
+          folly::DelayedDestruction::Destructor())));
+    auto connection = new ServerConnection(std::move(pipeline));
+    Acceptor::addConnection(connection);
+  }
+
+  ~ServerAcceptor() {
+    Acceptor::dropAllConnections();
+  }
+
+ private:
+  EventBase base_;
+
+  std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
+};
+
+template <typename Pipeline>
+class ServerAcceptorFactory : public AcceptorFactory {
+ public:
+  explicit ServerAcceptorFactory(
+      std::shared_ptr<PipelineFactory<Pipeline>> factory)
+    : factory_(factory) {}
+
+  std::shared_ptr<Acceptor> newAcceptor() {
+    return std::make_shared<ServerAcceptor<Pipeline>>(factory_);
+  }
+ private:
+  std::shared_ptr<PipelineFactory<Pipeline>> factory_;
+};
+
+class ServerWorkerFactory : public folly::wangle::ThreadFactory {
+ public:
+  explicit ServerWorkerFactory(std::shared_ptr<AcceptorFactory> acceptorFactory)
+      : internalFactory_(
+        std::make_shared<folly::wangle::NamedThreadFactory>("BootstrapWorker"))
+      , acceptorFactory_(acceptorFactory)
+    {}
+  virtual std::thread newThread(folly::wangle::Func&& func) override;
+
+  void setInternalFactory(
+    std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory);
+  void setNamePrefix(folly::StringPiece prefix);
+
+  template <typename F>
+  void forEachWorker(F&& f);
+
+ private:
+  std::shared_ptr<folly::wangle::NamedThreadFactory> internalFactory_;
+  folly::RWSpinLock workersLock_;
+  std::map<int32_t, std::shared_ptr<Acceptor>> workers_;
+  int32_t nextWorkerId_{0};
+
+  std::shared_ptr<AcceptorFactory> acceptorFactory_;
+};
+
+template <typename F>
+void ServerWorkerFactory::forEachWorker(F&& f) {
+  folly::RWSpinLock::ReadHolder guard(workersLock_);
+  for (const auto& kv : workers_) {
+    f(kv.second.get());
+  }
+}
+
+} // namespace
diff --git a/folly/experimental/wangle/bootstrap/ServerBootstrap.cpp b/folly/experimental/wangle/bootstrap/ServerBootstrap.cpp
new file mode 100644 (file)
index 0000000..17e4bb8
--- /dev/null
@@ -0,0 +1,54 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include <folly/experimental/wangle/bootstrap/ServerBootstrap.h>
+#include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
+#include <folly/io/async/EventBaseManager.h>
+
+namespace folly {
+
+std::thread ServerWorkerFactory::newThread(
+    folly::wangle::Func&& func) {
+  return internalFactory_->newThread([=](){
+    auto id = nextWorkerId_++;
+    auto worker = acceptorFactory_->newAcceptor();
+    {
+      folly::RWSpinLock::WriteHolder guard(workersLock_);
+      workers_.insert({id, worker});
+    }
+    EventBaseManager::get()->setEventBase(worker->getEventBase(), false);
+    func();
+    EventBaseManager::get()->clearEventBase();
+
+    worker->drainAllConnections();
+    {
+      folly::RWSpinLock::WriteHolder guard(workersLock_);
+      workers_.erase(id);
+    }
+  });
+}
+
+void ServerWorkerFactory::setInternalFactory(
+  std::shared_ptr<wangle::NamedThreadFactory> internalFactory) {
+  CHECK(workers_.empty());
+  internalFactory_ = internalFactory;
+}
+
+void ServerWorkerFactory::setNamePrefix(folly::StringPiece prefix) {
+  CHECK(workers_.empty());
+  internalFactory_->setNamePrefix(prefix);
+}
+
+} // namespace
diff --git a/folly/experimental/wangle/bootstrap/ServerBootstrap.h b/folly/experimental/wangle/bootstrap/ServerBootstrap.h
new file mode 100644 (file)
index 0000000..8f29cb5
--- /dev/null
@@ -0,0 +1,191 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#pragma once
+
+#include <folly/experimental/wangle/bootstrap/ServerBootstrap-inl.h>
+
+namespace folly {
+
+/*
+ * ServerBootstrap is a parent class intended to set up a
+ * high-performance TCP accepting server.  It will manage a pool of
+ * accepting threads, any number of accepting sockets, a pool of
+ * IO-worker threads, and connection pool for each IO thread for you.
+ *
+ * The output is given as a ChannelPipeline template: given a
+ * PipelineFactory, it will create a new pipeline for each connection,
+ * and your server can handle the incoming bytes.
+ *
+ * BACKWARDS COMPATIBLITY: for servers already taking a pool of
+ * Acceptor objects, an AcceptorFactory can be given directly instead
+ * of a pipeline factory.
+ */
+template <typename Pipeline>
+class ServerBootstrap {
+ public:
+  /* TODO(davejwatson)
+   *
+   * If there is any work to be done BEFORE handing the work to IO
+   * threads, this handler is where the pipeline to do it would be
+   * set.
+   *
+   * This could be used for things like logging, load balancing, or
+   * advanced load balancing on IO threads.  Netty also provides this.
+   */
+  ServerBootstrap* handler() {
+    return this;
+  }
+
+  /*
+   * BACKWARDS COMPATIBILITY - an acceptor factory can be set.  Your
+   * Acceptor is responsible for managing the connection pool.
+   *
+   * @param childHandler - acceptor factory to call for each IO thread
+   */
+  ServerBootstrap* childHandler(std::shared_ptr<AcceptorFactory> childHandler) {
+    acceptorFactory_ = childHandler;
+    return this;
+  }
+
+  /*
+   * Set a pipeline factory that will be called for each new connection
+   *
+   * @param factory pipeline factory to use for each new connection
+   */
+  ServerBootstrap* childPipeline(
+      std::shared_ptr<PipelineFactory<Pipeline>> factory) {
+    pipelineFactory_ = factory;
+    return this;
+  }
+
+  /*
+   * Set the IO executor.  If not set, a default one will be created
+   * with one thread per core.
+   *
+   * @param io_group - io executor to use for IO threads.
+   */
+  ServerBootstrap* group(
+      std::shared_ptr<folly::wangle::IOThreadPoolExecutor> io_group) {
+    return group(nullptr, io_group);
+  }
+
+  /*
+   * Set the acceptor executor, and IO executor.
+   *
+   * If no acceptor executor is set, a single thread will be created for accepts
+   * If no IO executor is set, a default of one thread per core will be created
+   *
+   * @param group - acceptor executor to use for acceptor threads.
+   * @param io_group - io executor to use for IO threads.
+   */
+  ServerBootstrap* group(
+      std::shared_ptr<folly::wangle::IOThreadPoolExecutor> accept_group,
+      std::shared_ptr<wangle::IOThreadPoolExecutor> io_group) {
+    if (!accept_group) {
+      accept_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
+        1, std::make_shared<wangle::NamedThreadFactory>("Acceptor Thread"));
+    }
+    if (!io_group) {
+      io_group = std::make_shared<folly::wangle::IOThreadPoolExecutor>(
+        32, std::make_shared<wangle::NamedThreadFactory>("IO Thread"));
+    }
+    auto factory = io_group->getThreadFactory();
+
+    //CHECK(factory == nullptr); // TODO
+
+    CHECK(acceptorFactory_ || pipelineFactory_);
+
+    if (acceptorFactory_) {
+      workerFactory_ = std::make_shared<ServerWorkerFactory>(
+        acceptorFactory_);
+    } else {
+      workerFactory_ = std::make_shared<ServerWorkerFactory>(
+        std::make_shared<ServerAcceptorFactory<Pipeline>>(pipelineFactory_));
+    }
+
+    acceptor_group_ = accept_group;
+    io_group_ = io_group;
+
+    auto numThreads = io_group_->numThreads();
+    io_group_->setNumThreads(0);
+    io_group_->setThreadFactory(workerFactory_);
+    io_group_->setNumThreads(numThreads);
+
+    return this;
+  }
+
+  /*
+   * Bind to a port and start listening.
+   * One of childPipeline or childHandler must be called before bind
+   *
+   * @param port Port to listen on
+   */
+  void bind(int port) {
+    // TODO bind to v4 and v6
+    // TODO take existing socket
+    // TODO use the acceptor thread
+    auto socket = folly::AsyncServerSocket::newSocket(
+      EventBaseManager::get()->getEventBase());
+    sockets_.push_back(socket);
+    socket->bind(port);
+
+    // TODO Take ServerSocketConfig
+    socket->listen(1024);
+
+    if (!workerFactory_) {
+      group(nullptr);
+    }
+
+    // Startup all the threads
+    workerFactory_->forEachWorker([this, socket](Acceptor* worker){
+        socket->getEventBase()->runInEventBaseThread([this, worker, socket](){
+          socket->addAcceptCallback(worker, worker->getEventBase());
+        });
+    });
+    socket->startAccepting();
+  }
+
+  /*
+   * Stop listening on all sockets.
+   */
+  void stop() {
+    for (auto& socket : sockets_) {
+      socket->stopAccepting();
+    }
+    acceptor_group_->join();
+    io_group_->join();
+  }
+
+  /*
+   * Get the list of listening sockets
+   */
+  std::vector<std::shared_ptr<folly::AsyncServerSocket>>&
+  getSockets() {
+    return sockets_;
+  }
+
+ private:
+  std::shared_ptr<wangle::IOThreadPoolExecutor> acceptor_group_;
+  std::shared_ptr<wangle::IOThreadPoolExecutor> io_group_;
+
+  std::shared_ptr<ServerWorkerFactory> workerFactory_;
+  std::vector<std::shared_ptr<folly::AsyncServerSocket>> sockets_;
+
+  std::shared_ptr<AcceptorFactory> acceptorFactory_;
+  std::shared_ptr<PipelineFactory<Pipeline>> pipelineFactory_;
+};
+
+} // namespace
index 89212df57df5abfffea562c4cce083f97143c0f4..de80a856cf0fac290362a23fb41a71a641741f03 100644 (file)
@@ -354,3 +354,16 @@ class ChannelPipeline<Handler, Handlers...>
 };
 
 }}
+
+namespace folly {
+
+class AsyncSocket;
+
+template <typename Pipeline>
+class PipelineFactory {
+ public:
+  virtual Pipeline* newPipeline(std::shared_ptr<AsyncSocket>) = 0;
+  virtual ~PipelineFactory() {}
+};
+
+}
index b1a3dd3854af6658aa3e74e8a758b7afa76fc6f6..52c7d1f789d1f7200ac84eaf59a6893c901ac20e 100644 (file)
@@ -50,6 +50,10 @@ class ThreadPoolExecutor : public Executor {
     threadFactory_ = std::move(threadFactory);
   }
 
+  std::shared_ptr<ThreadFactory> getThreadFactory(void) {
+    return threadFactory_;
+  }
+
   size_t numThreads();
   void setNumThreads(size_t numThreads);
   void stop();