Allow to specify per-priority capacities in PriorityLifoSemMPMCQueue
authorGiuseppe Ottaviano <ott@fb.com>
Tue, 7 Nov 2017 06:41:52 +0000 (22:41 -0800)
committerFacebook Github Bot <facebook-github-bot@users.noreply.github.com>
Tue, 7 Nov 2017 06:50:39 +0000 (22:50 -0800)
Summary: The `THROW` behavior of `LifoSemMPMCQueue` is unsafe when calling `join()`, because the queue may be full and `join()` will fail to enqueue the poisons. To work around this we can use `PriorityLifoSemMPMCQueue` and dedicate `LO_PRI` to the poisons, but there's no reason that the low priority queue should have the same size as the normal priority. Add a constructor to be able to specify different sizes.

Reviewed By: yfeldblum

Differential Revision: D6257017

fbshipit-source-id: c75f33c38fcdad646ba1499bcd434ab65711250c

folly/executors/CPUThreadPoolExecutor.h
folly/executors/task_queue/PriorityLifoSemMPMCQueue.h
folly/executors/task_queue/test/PriorityLifoSemMPMCQueueTest.cpp [new file with mode: 0644]

index 64f1b73c7bea691cc1757f8f5b488b756e1ee968..153fc7060850c98a38eabb3515228c67602ec7b0 100644 (file)
@@ -29,6 +29,14 @@ namespace folly {
  * the same queue. MPMC queue excels in this situation but dictates a max queue
  * size.
  *
+ * @note The default queue throws when full (folly::QueueBehaviorIfFull::THROW),
+ * so add() can fail. Furthermore, join() can also fail if the queue is full,
+ * because it enqueues numThreads poison tasks to stop the threads. If join() is
+ * needed to be guaranteed to succeed PriorityLifoSemMPMCQueue can be used
+ * instead, initializing the lowest priority's (LO_PRI) capacity to at least
+ * numThreads. Poisons use LO_PRI so if that priority is not used for any user
+ * task join() is guaranteed not to encounter a full queue.
+ *
  * @note If a blocking queue (folly::QueueBehaviorIfFull::BLOCK) is used, and
  * tasks executing on a given thread pool schedule more tasks, deadlock is
  * possible if the queue becomes full.  Deadlock is also possible if there is
index ab9d12efb230fd2b62fa6068a6ad049c061309dd..54d72f96ee8f458b8dcf8054cd6927a02836735c 100644 (file)
 
 #include <folly/Executor.h>
 #include <folly/MPMCQueue.h>
+#include <folly/Range.h>
 #include <folly/executors/task_queue/BlockingQueue.h>
 #include <folly/synchronization/LifoSem.h>
+#include <glog/logging.h>
 
 namespace folly {
 
@@ -36,6 +38,15 @@ class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
     }
   }
 
+  PriorityLifoSemMPMCQueue(folly::Range<const size_t*> capacities) {
+    CHECK_LT(capacities.size(), 256) << "At most 255 priorities supported";
+
+    queues_.reserve(capacities.size());
+    for (auto capacity : capacities) {
+      queues_.emplace_back(capacity);
+    }
+  }
+
   uint8_t getNumPriorities() override {
     return queues_.size();
   }
diff --git a/folly/executors/task_queue/test/PriorityLifoSemMPMCQueueTest.cpp b/folly/executors/task_queue/test/PriorityLifoSemMPMCQueueTest.cpp
new file mode 100644 (file)
index 0000000..ba8c7eb
--- /dev/null
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include <folly/Range.h>
+#include <folly/container/Enumerate.h>
+#include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
+#include <folly/portability/GTest.h>
+
+using namespace folly;
+
+TEST(PriorityLifoSemMPMCQueue, Capacities) {
+  const std::vector<size_t> capacities = {1, 2, 3};
+  PriorityLifoSemMPMCQueue<int, QueueBehaviorIfFull::THROW> q(
+      folly::range(capacities));
+
+  for (auto capacity : folly::enumerate(capacities)) {
+    auto pri = static_cast<int8_t>(capacity.index) - 1;
+    for (size_t i = 0; i < *capacity; ++i) {
+      EXPECT_NO_THROW(q.addWithPriority(0, pri)) << *capacity << " " << i;
+    }
+    EXPECT_THROW(q.addWithPriority(0, pri), QueueFullException) << *capacity;
+  }
+}