ThreadPoolExecutor and its children CPUThreadPoolExecutor and IOThreadPoolExecutor
authorJames Sedgwick <jsedgwick@fb.com>
Wed, 17 Sep 2014 09:58:06 +0000 (02:58 -0700)
committerDave Watson <davejwatson@fb.com>
Wed, 17 Sep 2014 18:23:48 +0000 (11:23 -0700)
Summary:
Spun off from https://phabricator.fb.com/D1534506 as this seemed different enough for a new diff

Similar to previous diff but attempts to reuse a common thread management process between cpu and io bound thread pools. Also sets the stage for other common functionality, e.g. stats, monitoring, timeouts, and so on

Here is some output from the queue benchmark in common/concurrent with both of these pools added (changes to BM not in this diff): https://phabricator.fb.com/P16308560

Test Plan: added a unit test, ran benchmark

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, njormrod, bmatheny

FB internal diff: D1555443

Tasks: 50023925002425

12 files changed:
folly/experimental/wangle/concurrent/BlockingQueue.h [new file with mode: 0644]
folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp [new file with mode: 0644]
folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h [new file with mode: 0644]
folly/experimental/wangle/concurrent/Executor.h [new file with mode: 0644]
folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp [new file with mode: 0644]
folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h [new file with mode: 0644]
folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h [new file with mode: 0644]
folly/experimental/wangle/concurrent/NamedThreadFactory.h [new file with mode: 0644]
folly/experimental/wangle/concurrent/ThreadFactory.h [new file with mode: 0644]
folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp [new file with mode: 0644]
folly/experimental/wangle/concurrent/ThreadPoolExecutor.h [new file with mode: 0644]
folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp [new file with mode: 0644]

diff --git a/folly/experimental/wangle/concurrent/BlockingQueue.h b/folly/experimental/wangle/concurrent/BlockingQueue.h
new file mode 100644 (file)
index 0000000..6a65354
--- /dev/null
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+namespace folly { namespace wangle {
+
+template <class T>
+class BlockingQueue {
+ public:
+  virtual ~BlockingQueue() {}
+  virtual void add(T item) = 0;
+  virtual T take() = 0;
+  virtual size_t size() = 0;
+};
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp
new file mode 100644 (file)
index 0000000..4a133fb
--- /dev/null
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
+
+namespace folly { namespace wangle {
+
+const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18;
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+    size_t numThreads,
+    std::unique_ptr<BlockingQueue<Task>> taskQueue,
+    std::unique_ptr<ThreadFactory> threadFactory)
+    : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
+      taskQueue_(std::move(taskQueue)) {
+  addThreads(numThreads);
+  CHECK(threadList_.get().size() == numThreads);
+}
+
+CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
+  stop();
+  CHECK(threadsToStop_ == 0);
+}
+
+void CPUThreadPoolExecutor::add(Func func) {
+  // TODO handle enqueue failure, here and in other add() callsites
+  taskQueue_->add(Task(std::move(func)));
+}
+
+void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
+  while (1) {
+    // TODO expiration / codel
+    auto t = taskQueue_->take();
+    if (UNLIKELY(t.poison)) {
+      CHECK(threadsToStop_-- > 0);
+      stoppedThreads_.add(thread);
+      return;
+    } else {
+      thread->idle = false;
+      try {
+        t.func();
+      } catch (const std::exception& e) {
+        LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled " <<
+                      typeid(e).name() << " exception: " << e.what();
+      } catch (...) {
+        LOG(ERROR) << "CPUThreadPoolExecutor: func threw unhandled non-exception "
+                      "object";
+      }
+      thread->idle = true;
+    }
+
+    if (UNLIKELY(threadsToStop_ > 0 && !isJoin_)) {
+      if (--threadsToStop_ >= 0) {
+        stoppedThreads_.add(thread);
+        return;
+      } else {
+        threadsToStop_++;
+      }
+    }
+  }
+}
+
+void CPUThreadPoolExecutor::stopThreads(size_t n) {
+  CHECK(stoppedThreads_.size() == 0);
+  threadsToStop_ = n;
+  for (int i = 0; i < n; i++) {
+    taskQueue_->add(Task());
+  }
+}
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h
new file mode 100644 (file)
index 0000000..210fd67
--- /dev/null
@@ -0,0 +1,60 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
+
+namespace folly { namespace wangle {
+
+class CPUThreadPoolExecutor : public ThreadPoolExecutor {
+ public:
+  struct Task;
+
+  // TODO thread naming, perhaps a required input to ThreadFactories
+  explicit CPUThreadPoolExecutor(
+      size_t numThreads,
+      std::unique_ptr<BlockingQueue<Task>> taskQueue =
+          folly::make_unique<LifoSemMPMCQueue<Task>>(
+              CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+      std::unique_ptr<ThreadFactory> threadFactory =
+          folly::make_unique<NamedThreadFactory>("CPUThreadPool"));
+
+  ~CPUThreadPoolExecutor();
+
+  void add(Func func) override;
+
+  struct Task {
+    explicit Task(Func&& taskArg) : func(std::move(taskArg)), poison(false) {}
+    Task() : func(nullptr), poison(true) {}
+    Task(Task&& o) noexcept : func(std::move(o.func)), poison(o.poison) {}
+    Task(const Task&) = default;
+    Task& operator=(const Task&) = default;
+    Func func;
+    bool poison;
+    // TODO per-task stats, timeouts, expirations
+  };
+
+  static const size_t kDefaultMaxQueueSize;
+
+ private:
+  void threadRun(ThreadPtr thread) override;
+  void stopThreads(size_t n) override;
+
+  std::atomic<size_t> threadsToStop_;
+  std::unique_ptr<BlockingQueue<Task>> taskQueue_;
+};
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/Executor.h b/folly/experimental/wangle/concurrent/Executor.h
new file mode 100644 (file)
index 0000000..49db177
--- /dev/null
@@ -0,0 +1,31 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+
+#include <functional>
+
+namespace folly { namespace wangle {
+
+typedef std::function<void()> Func;
+
+class Executor {
+ public:
+  virtual ~Executor() {};
+  virtual void add(Func func) = 0;
+};
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.cpp
new file mode 100644 (file)
index 0000000..cdc36ef
--- /dev/null
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
+
+#include <folly/MoveWrapper.h>
+#include <glog/logging.h>
+
+namespace folly { namespace wangle {
+
+IOThreadPoolExecutor::IOThreadPoolExecutor(
+    size_t numThreads,
+    std::unique_ptr<ThreadFactory> threadFactory)
+  : ThreadPoolExecutor(numThreads, std::move(threadFactory)),
+    nextThread_(0) {
+  addThreads(numThreads);
+  CHECK(threadList_.get().size() == numThreads);
+}
+
+IOThreadPoolExecutor::~IOThreadPoolExecutor() {
+  stop();
+}
+
+void IOThreadPoolExecutor::add(Func func) {
+  RWSpinLock::ReadHolder{&threadListLock_};
+  if (threadList_.get().empty()) {
+    throw std::runtime_error("No threads available");
+  }
+  auto thread = threadList_.get()[nextThread_++ % threadList_.get().size()];
+  auto ioThread = std::static_pointer_cast<IOThread>(thread);
+
+  auto moveFunc = folly::makeMoveWrapper(std::move(func));
+  auto wrappedFunc = [moveFunc, ioThread] () {
+    (*moveFunc)();
+    ioThread->outstandingTasks--;
+  };
+
+  ioThread->outstandingTasks++;
+  if (!ioThread->eventBase.runInEventBaseThread(std::move(wrappedFunc))) {
+    ioThread->outstandingTasks--;
+    throw std::runtime_error("Unable to run func in event base thread");
+  }
+}
+
+std::shared_ptr<ThreadPoolExecutor::Thread>
+IOThreadPoolExecutor::makeThread() {
+  return std::make_shared<IOThread>();
+}
+
+void IOThreadPoolExecutor::threadRun(ThreadPtr thread) {
+  const auto ioThread = std::static_pointer_cast<IOThread>(thread);
+  while (ioThread->shouldRun) {
+    ioThread->eventBase.loopForever();
+  }
+  if (isJoin_) {
+    while (ioThread->outstandingTasks > 0) {
+      ioThread->eventBase.loopOnce();
+    }
+  }
+  stoppedThreads_.add(ioThread);
+}
+
+void IOThreadPoolExecutor::stopThreads(size_t n) {
+  for (int i = 0; i < n; i++) {
+    const auto ioThread = std::static_pointer_cast<IOThread>(
+        threadList_.get()[i]);
+    ioThread->shouldRun = false;
+    ioThread->eventBase.terminateLoopSoon();
+  }
+}
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h
new file mode 100644 (file)
index 0000000..2b498bd
--- /dev/null
@@ -0,0 +1,49 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
+#include <folly/io/async/EventBase.h>
+
+namespace folly { namespace wangle {
+
+class IOThreadPoolExecutor : public ThreadPoolExecutor {
+ public:
+  explicit IOThreadPoolExecutor(
+      size_t numThreads,
+      std::unique_ptr<ThreadFactory> threadFactory =
+          folly::make_unique<NamedThreadFactory>("IOThreadPool"));
+
+  ~IOThreadPoolExecutor();
+
+  void add(Func func) override;
+
+ private:
+  ThreadPtr makeThread() override;
+  void threadRun(ThreadPtr thread) override;
+  void stopThreads(size_t n) override;
+
+  struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING IOThread : public Thread {
+    IOThread() : shouldRun(true), outstandingTasks(0) {};
+    std::atomic<bool> shouldRun;
+    std::atomic<size_t> outstandingTasks;
+    EventBase eventBase;
+  };
+
+  size_t nextThread_;
+};
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h b/folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h
new file mode 100644 (file)
index 0000000..45d362e
--- /dev/null
@@ -0,0 +1,57 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/BlockingQueue.h>
+#include <folly/LifoSem.h>
+#include <folly/MPMCQueue.h>
+
+namespace folly { namespace wangle {
+
+template <class T>
+class LifoSemMPMCQueue : public BlockingQueue<T> {
+ public:
+  explicit LifoSemMPMCQueue(size_t capacity) : queue_(capacity) {}
+
+  void add(T item) override {
+    if (!queue_.write(std::move(item))) {
+      throw std::runtime_error("LifoSemMPMCQueue full, can't add item");
+    }
+    sem_.post();
+  }
+
+  T take() override {
+    T item;
+    while (!queue_.read(item)) {
+      sem_.wait();
+    }
+    return item;
+  }
+
+  size_t capacity() {
+    return queue_.capacity();
+  }
+
+  size_t size() override {
+    return queue_.size();
+  }
+
+ private:
+  LifoSem sem_;
+  MPMCQueue<T> queue_;
+};
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/NamedThreadFactory.h b/folly/experimental/wangle/concurrent/NamedThreadFactory.h
new file mode 100644 (file)
index 0000000..5c513c5
--- /dev/null
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/ThreadFactory.h>
+#include <folly/Conv.h>
+#include <folly/ThreadName.h>
+
+namespace folly { namespace wangle {
+
+class NamedThreadFactory : public ThreadFactory {
+ public:
+  explicit NamedThreadFactory(folly::StringPiece prefix)
+    : prefix_(prefix), suffix_(0) {}
+
+  std::thread newThread(Func&& func) override {
+    auto thread = std::thread(std::move(func));
+    folly::setThreadName(
+        thread.native_handle(),
+        folly::to<std::string>(prefix_, suffix_++));
+    return thread;
+  }
+
+ private:
+  folly::StringPiece prefix_;
+  std::atomic<uint64_t> suffix_;
+};
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/ThreadFactory.h b/folly/experimental/wangle/concurrent/ThreadFactory.h
new file mode 100644 (file)
index 0000000..b5da075
--- /dev/null
@@ -0,0 +1,30 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/Executor.h>
+
+#include <thread>
+
+namespace folly { namespace wangle {
+
+class ThreadFactory {
+ public:
+  virtual ~ThreadFactory() {}
+  virtual std::thread newThread(Func&& func) = 0;
+};
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.cpp
new file mode 100644 (file)
index 0000000..4d249b0
--- /dev/null
@@ -0,0 +1,110 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
+
+namespace folly { namespace wangle {
+
+ThreadPoolExecutor::ThreadPoolExecutor(
+    size_t numThreads,
+    std::unique_ptr<ThreadFactory> threadFactory)
+    : threadFactory_(std::move(threadFactory)) {}
+
+ThreadPoolExecutor::~ThreadPoolExecutor() {
+  CHECK(threadList_.get().size() == 0);
+}
+
+size_t ThreadPoolExecutor::numThreads() {
+  RWSpinLock::ReadHolder{&threadListLock_};
+  return threadList_.get().size();
+}
+
+void ThreadPoolExecutor::setNumThreads(size_t n) {
+  RWSpinLock::WriteHolder{&threadListLock_};
+  const auto current = threadList_.get().size();
+  if (n > current ) {
+    addThreads(n - current);
+  } else if (n < current) {
+    removeThreads(current - n, true);
+  }
+  CHECK(threadList_.get().size() == n);
+}
+
+void ThreadPoolExecutor::addThreads(size_t n) {
+  for (int i = 0; i < n; i++) {
+    auto thread = makeThread();
+    // TODO need a notion of failing to create the thread
+    // and then handling for that case
+    thread->handle = threadFactory_->newThread(
+        std::bind(&ThreadPoolExecutor::threadRun, this, thread));
+    threadList_.add(thread);
+  }
+}
+
+void ThreadPoolExecutor::removeThreads(size_t n, bool isJoin) {
+  CHECK(n <= threadList_.get().size());
+  CHECK(stoppedThreads_.size() == 0);
+  isJoin_ = isJoin;
+  stopThreads(n);
+  for (int i = 0; i < n; i++) {
+    auto thread = stoppedThreads_.take();
+    thread->handle.join();
+    threadList_.remove(thread);
+  }
+  CHECK(stoppedThreads_.size() == 0);
+}
+
+void ThreadPoolExecutor::stop() {
+  RWSpinLock::WriteHolder{&threadListLock_};
+  removeThreads(threadList_.get().size(), false);
+  CHECK(threadList_.get().size() == 0);
+}
+
+void ThreadPoolExecutor::join() {
+  RWSpinLock::WriteHolder{&threadListLock_};
+  removeThreads(threadList_.get().size(), true);
+  CHECK(threadList_.get().size() == 0);
+}
+
+std::atomic<uint64_t> ThreadPoolExecutor::Thread::nextId(0);
+
+void ThreadPoolExecutor::StoppedThreadQueue::add(
+    ThreadPoolExecutor::ThreadPtr item) {
+  std::lock_guard<std::mutex> guard(mutex_);
+  queue_.push(std::move(item));
+  sem_.post();
+}
+
+ThreadPoolExecutor::ThreadPtr ThreadPoolExecutor::StoppedThreadQueue::take() {
+  while(1) {
+    {
+      std::lock_guard<std::mutex> guard(mutex_);
+      if (queue_.size() > 0) {
+        auto item = std::move(queue_.front());
+        queue_.pop();
+        return item;
+      }
+    }
+    sem_.wait();
+  }
+}
+
+size_t ThreadPoolExecutor::StoppedThreadQueue::size() {
+  std::lock_guard<std::mutex> guard(mutex_);
+  return queue_.size();
+}
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h b/folly/experimental/wangle/concurrent/ThreadPoolExecutor.h
new file mode 100644 (file)
index 0000000..f802af1
--- /dev/null
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/Executor.h>
+#include <folly/experimental/wangle/concurrent/LifoSemMPMCQueue.h>
+#include <folly/experimental/wangle/concurrent/NamedThreadFactory.h>
+#include <folly/Memory.h>
+#include <folly/RWSpinLock.h>
+
+#include <algorithm>
+#include <mutex>
+#include <queue>
+
+#include <glog/logging.h>
+
+namespace folly { namespace wangle {
+
+class ThreadPoolExecutor : public Executor {
+ public:
+  explicit ThreadPoolExecutor(
+      size_t numThreads,
+      std::unique_ptr<ThreadFactory> threadFactory);
+
+  ~ThreadPoolExecutor();
+
+  size_t numThreads();
+  void setNumThreads(size_t numThreads);
+  void stop();
+  void join();
+  // TODO expose stats
+
+ protected:
+  void addThreads(size_t n);
+  void removeThreads(size_t n, bool isJoin);
+
+  struct FOLLY_ALIGN_TO_AVOID_FALSE_SHARING Thread {
+    virtual ~Thread() {}
+    Thread() : id(nextId++), handle(), idle(true) {};
+    static std::atomic<uint64_t> nextId;
+    uint64_t id;
+    std::thread handle;
+    bool idle;
+    // TODO per-thread stats go here
+  };
+
+  typedef std::shared_ptr<Thread> ThreadPtr;
+
+  // The function that will be bound to pool threads
+  virtual void threadRun(ThreadPtr thread) = 0;
+  // Stop n threads and put their Thread structs in the threadsStopped_ queue
+  virtual void stopThreads(size_t n) = 0;
+  // Create a suitable Thread struct
+  virtual ThreadPtr makeThread() {
+    return std::make_shared<Thread>();
+  }
+  // need a stopThread(id) for keepalive feature
+
+  class ThreadList {
+   public:
+    void add(const ThreadPtr& state) {
+      auto it = std::lower_bound(vec_.begin(), vec_.end(), state, compare);
+      vec_.insert(it, state);
+    }
+
+    void remove(const ThreadPtr& state) {
+      auto itPair = std::equal_range(vec_.begin(), vec_.end(), state, compare);
+      CHECK(itPair.first != vec_.end());
+      CHECK(std::next(itPair.first) == itPair.second);
+      vec_.erase(itPair.first);
+    }
+
+    const std::vector<ThreadPtr>& get() const {
+      return vec_;
+    }
+
+   private:
+    static bool compare(const ThreadPtr& ts1, const ThreadPtr& ts2) {
+      return ts1->id < ts2->id;
+    }
+
+    std::vector<ThreadPtr> vec_;
+  };
+
+  class StoppedThreadQueue : public BlockingQueue<ThreadPtr> {
+   public:
+    void add(ThreadPtr item) override;
+    ThreadPtr take() override;
+    size_t size() override;
+
+   private:
+    LifoSem sem_;
+    std::mutex mutex_;
+    std::queue<ThreadPtr> queue_;
+  };
+
+  std::unique_ptr<ThreadFactory> threadFactory_;
+  ThreadList threadList_;
+  RWSpinLock threadListLock_;
+  StoppedThreadQueue stoppedThreads_;
+  std::atomic<bool> isJoin_; // whether the current downsizing is a join
+};
+
+}} // folly::wangle
diff --git a/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp b/folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp
new file mode 100644 (file)
index 0000000..fe7b8dc
--- /dev/null
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
+#include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <folly/experimental/wangle/concurrent/IOThreadPoolExecutor.h>
+#include <glog/logging.h>
+#include <gtest/gtest.h>
+
+using namespace folly::wangle;
+
+template <class TPE>
+static void basic() {
+  // Create and destroy
+  TPE tpe(10);
+}
+
+TEST(ThreadPoolExecutorTest, CPUBasic) {
+  basic<CPUThreadPoolExecutor>();
+}
+
+TEST(IOThreadPoolExecutorTest, IOBasic) {
+  basic<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void resize() {
+  TPE tpe(100);
+  EXPECT_EQ(100, tpe.numThreads());
+  tpe.setNumThreads(50);
+  EXPECT_EQ(50, tpe.numThreads());
+  tpe.setNumThreads(150);
+  EXPECT_EQ(150, tpe.numThreads());
+}
+
+TEST(ThreadPoolExecutorTest, CPUResize) {
+  resize<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOResize) {
+  resize<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void stop() {
+  TPE tpe(10);
+  std::atomic<int> completed(0);
+  auto f = [&](){
+    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    completed++;
+  };
+  for (int i = 0; i < 1000; i++) {
+    tpe.add(f);
+  }
+  tpe.stop();
+  EXPECT_GT(1000, completed);
+}
+
+TEST(ThreadPoolExecutorTest, CPUStop) {
+  stop<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOStop) {
+  stop<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void join() {
+  TPE tpe(10);
+  std::atomic<int> completed(0);
+  auto f = [&](){
+    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    completed++;
+  };
+  for (int i = 0; i < 1000; i++) {
+    tpe.add(f);
+  }
+  tpe.join();
+  EXPECT_EQ(1000, completed);
+}
+
+TEST(ThreadPoolExecutorTest, CPUJoin) {
+  join<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOJoin) {
+  join<IOThreadPoolExecutor>();
+}
+
+template <class TPE>
+static void resizeUnderLoad() {
+  TPE tpe(10);
+  std::atomic<int> completed(0);
+  auto f = [&](){
+    std::this_thread::sleep_for(std::chrono::milliseconds(1));
+    completed++;
+  };
+  for (int i = 0; i < 1000; i++) {
+    tpe.add(f);
+  }
+  tpe.setNumThreads(5);
+  tpe.setNumThreads(15);
+  tpe.join();
+  EXPECT_EQ(1000, completed);
+}
+
+TEST(ThreadPoolExecutorTest, CPUResizeUnderLoad) {
+  resizeUnderLoad<CPUThreadPoolExecutor>();
+}
+
+TEST(ThreadPoolExecutorTest, IOResizeUnderLoad) {
+  resizeUnderLoad<IOThreadPoolExecutor>();
+}