priority CPU thread pool
authorJames Sedgwick <jsedgwick@fb.com>
Mon, 17 Nov 2014 22:44:10 +0000 (14:44 -0800)
committerDave Watson <davejwatson@fb.com>
Wed, 19 Nov 2014 20:52:31 +0000 (12:52 -0800)
Summary:
just extend CPUThreadPoolExecutor to use a queue that is itself composed of N mpmc queues, one per priority

the verbosity is starting to kill me, i had thought before of truncating Executor of all these pool types and now I'm definitely going to do that unless someone fights me.

Test Plan: added unit; maybe i'm not being clever enough as i couldn't think of many ways to test this reliably so there's just a basic preemption test

Reviewed By: davejwatson@fb.com

Subscribers: trunkagent, fugalh, njormrod, folly-diffs@, bmatheny

FB internal diff: D1676452

Tasks: 5002392

Signature: t1:1676452:1416263990:cdf5d44e4a50a6180ba547a3ed4c0c24d4ffdd8f

folly/experimental/wangle/concurrent/BlockingQueue.h
folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.cpp
folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h
folly/experimental/wangle/concurrent/PriorityLifoSemMPMCQueue.h [new file with mode: 0644]
folly/experimental/wangle/concurrent/test/ThreadPoolExecutorTest.cpp

index 6a6535448c2f17e25c7b371e7e6cdabf2aa29e9c..08a1f70372482d56c71ff8f54ec469b16713fc4c 100644 (file)
@@ -15,6 +15,9 @@
  */
 
 #pragma once
+
+#include <glog/logging.h>
+
 namespace folly { namespace wangle {
 
 template <class T>
@@ -22,6 +25,16 @@ class BlockingQueue {
  public:
   virtual ~BlockingQueue() {}
   virtual void add(T item) = 0;
+  virtual void addWithPriority(T item, uint32_t priority) {
+    LOG_FIRST_N(WARNING, 1) <<
+      "add(item, priority) called on a non-priority queue";
+    add(std::move(item));
+  }
+  virtual uint32_t getNumPriorities() {
+    LOG_FIRST_N(WARNING, 1) <<
+      "getNumPriorities() called on a non-priority queue";
+    return 1;
+  }
   virtual T take() = 0;
   virtual size_t size() = 0;
 };
index 4ef2045451ba7ebf98065693623394c8844c2585..9caf6bee3c964e9c1335d7e0c97979386c152779 100644 (file)
  */
 
 #include <folly/experimental/wangle/concurrent/CPUThreadPoolExecutor.h>
+#include <folly/experimental/wangle/concurrent/PriorityLifoSemMPMCQueue.h>
 
 namespace folly { namespace wangle {
 
 const size_t CPUThreadPoolExecutor::kDefaultMaxQueueSize = 1 << 18;
+const size_t CPUThreadPoolExecutor::kDefaultNumPriorities = 2;
 
 CPUThreadPoolExecutor::CPUThreadPoolExecutor(
     size_t numThreads,
@@ -30,6 +32,31 @@ CPUThreadPoolExecutor::CPUThreadPoolExecutor(
   CHECK(threadList_.get().size() == numThreads);
 }
 
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+    size_t numThreads,
+    std::shared_ptr<ThreadFactory> threadFactory)
+    : CPUThreadPoolExecutor(
+          numThreads,
+          folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
+              CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+          std::move(threadFactory)) {}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(size_t numThreads)
+    : CPUThreadPoolExecutor(
+          numThreads,
+          std::make_shared<NamedThreadFactory>("CPUThreadPool")) {}
+
+CPUThreadPoolExecutor::CPUThreadPoolExecutor(
+    size_t numThreads,
+    uint32_t numPriorities,
+    std::shared_ptr<ThreadFactory> threadFactory)
+    : CPUThreadPoolExecutor(
+          numThreads,
+          folly::make_unique<PriorityLifoSemMPMCQueue<CPUTask>>(
+              numPriorities,
+              CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+          std::move(threadFactory)) {}
+
 CPUThreadPoolExecutor::~CPUThreadPoolExecutor() {
   stop();
   CHECK(threadsToStop_ == 0);
@@ -48,6 +75,30 @@ void CPUThreadPoolExecutor::add(
       CPUTask(std::move(func), expiration, std::move(expireCallback)));
 }
 
+void CPUThreadPoolExecutor::add(Func func, uint32_t priority) {
+  add(std::move(func), priority, std::chrono::milliseconds(0));
+}
+
+void CPUThreadPoolExecutor::add(
+    Func func,
+    uint32_t priority,
+    std::chrono::milliseconds expiration,
+    Func expireCallback) {
+  CHECK(priority < getNumPriorities());
+  taskQueue_->addWithPriority(
+      CPUTask(std::move(func), expiration, std::move(expireCallback)),
+      priority);
+}
+
+uint32_t CPUThreadPoolExecutor::getNumPriorities() const {
+  return taskQueue_->getNumPriorities();
+}
+
+BlockingQueue<CPUThreadPoolExecutor::CPUTask>*
+CPUThreadPoolExecutor::getTaskQueue() {
+  return taskQueue_.get();
+}
+
 void CPUThreadPoolExecutor::threadRun(std::shared_ptr<Thread> thread) {
   thread->startupBaton.post();
   while (1) {
index f331232f4b9df6a88a3b9aa8d6d884da3ea91764..b7e88685f0cda9367d4c3c24db9bac1dc8d9b64c 100644 (file)
@@ -15,6 +15,7 @@
  */
 
 #pragma once
+
 #include <folly/experimental/wangle/concurrent/ThreadPoolExecutor.h>
 
 namespace folly { namespace wangle {
@@ -25,9 +26,19 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
 
   explicit CPUThreadPoolExecutor(
       size_t numThreads,
-      std::unique_ptr<BlockingQueue<CPUTask>> taskQueue =
-          folly::make_unique<LifoSemMPMCQueue<CPUTask>>(
-              CPUThreadPoolExecutor::kDefaultMaxQueueSize),
+      std::unique_ptr<BlockingQueue<CPUTask>> taskQueue,
+      std::shared_ptr<ThreadFactory> threadFactory =
+          std::make_shared<NamedThreadFactory>("CPUThreadPool"));
+
+  explicit CPUThreadPoolExecutor(size_t numThreads);
+
+  explicit CPUThreadPoolExecutor(
+      size_t numThreads,
+      std::shared_ptr<ThreadFactory> threadFactory);
+
+  explicit CPUThreadPoolExecutor(
+      size_t numThreads,
+      uint32_t numPriorities,
       std::shared_ptr<ThreadFactory> threadFactory =
           std::make_shared<NamedThreadFactory>("CPUThreadPool"));
 
@@ -39,6 +50,15 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
       std::chrono::milliseconds expiration,
       Func expireCallback = nullptr) override;
 
+  void add(Func func, uint32_t priority);
+  void add(
+      Func func,
+      uint32_t priority,
+      std::chrono::milliseconds expiration,
+      Func expireCallback = nullptr);
+
+  uint32_t getNumPriorities() const;
+
   struct CPUTask : public ThreadPoolExecutor::Task {
     // Must be noexcept move constructible so it can be used in MPMCQueue
     explicit CPUTask(
@@ -57,6 +77,10 @@ class CPUThreadPoolExecutor : public ThreadPoolExecutor {
   };
 
   static const size_t kDefaultMaxQueueSize;
+  static const size_t kDefaultNumPriorities;
+
+ protected:
+  BlockingQueue<CPUTask>* getTaskQueue();
 
  private:
   void threadRun(ThreadPtr thread) override;
diff --git a/folly/experimental/wangle/concurrent/PriorityLifoSemMPMCQueue.h b/folly/experimental/wangle/concurrent/PriorityLifoSemMPMCQueue.h
new file mode 100644 (file)
index 0000000..65500f5
--- /dev/null
@@ -0,0 +1,77 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#pragma once
+#include <folly/experimental/wangle/concurrent/BlockingQueue.h>
+#include <folly/LifoSem.h>
+#include <folly/MPMCQueue.h>
+
+namespace folly { namespace wangle {
+
+template <class T>
+class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
+ public:
+  explicit PriorityLifoSemMPMCQueue(uint32_t numPriorities, size_t capacity) {
+    CHECK(numPriorities > 0);
+    queues_.reserve(numPriorities);
+    for (int i = 0; i < numPriorities; i++) {
+      queues_.push_back(MPMCQueue<T>(capacity));
+    }
+  }
+
+  uint32_t getNumPriorities() override {
+    return queues_.size();
+  }
+
+  // Add at lowest priority by default
+  void add(T item) override {
+    addWithPriority(std::move(item), 0);
+  }
+
+  void addWithPriority(T item, uint32_t priority) override {
+    CHECK(priority < queues_.size());
+    if (!queues_[priority].write(std::move(item))) {
+      throw std::runtime_error("LifoSemMPMCQueue full, can't add item");
+    }
+    sem_.post();
+  }
+
+  T take() override {
+    T item;
+    while (true) {
+      for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
+        if (it->read(item)) {
+          return item;
+        }
+      }
+      sem_.wait();
+    }
+  }
+
+  size_t size() override {
+    size_t size = 0;
+    for (auto& q : queues_) {
+      size += q.size();
+    }
+    return size;
+  }
+
+ private:
+  LifoSem sem_;
+  std::vector<MPMCQueue<T>> queues_;
+};
+
+}} // folly::wangle
index 471c8c6dc63d057311ed3aaa114bb500c5361147..6e3782ce40724af14714978abdf7473368c2e9d8 100644 (file)
@@ -277,3 +277,26 @@ TEST(ThreadPoolExecutorTest, CPUFuturePool) {
 TEST(ThreadPoolExecutorTest, IOFuturePool) {
   futureExecutor<IOThreadPoolExecutor>();
 }
+
+TEST(ThreadPoolExecutorTest, PriorityPreemptionTest) {
+  bool tookLopri = false;
+  auto completed = 0;
+  auto hipri = [&] {
+    EXPECT_FALSE(tookLopri);
+    completed++;
+  };
+  auto lopri = [&] {
+    tookLopri = true;
+    completed++;
+  };
+  CPUThreadPoolExecutor pool(0, 2);
+  for (int i = 0; i < 50; i++) {
+    pool.add(lopri, 0);
+  }
+  for (int i = 0; i < 50; i++) {
+    pool.add(hipri, 1);
+  }
+  pool.setNumThreads(1);
+  pool.join();
+  EXPECT_EQ(100, completed);
+}