From a77fea17099fba84cb98a6bcc0aa2404defd8054 Mon Sep 17 00:00:00 2001 From: Giuseppe Ottaviano Date: Mon, 6 Nov 2017 22:41:52 -0800 Subject: [PATCH] Allow to specify per-priority capacities in PriorityLifoSemMPMCQueue Summary: The `THROW` behavior of `LifoSemMPMCQueue` is unsafe when calling `join()`, because the queue may be full and `join()` will fail to enqueue the poisons. To work around this we can use `PriorityLifoSemMPMCQueue` and dedicate `LO_PRI` to the poisons, but there's no reason that the low priority queue should have the same size as the normal priority. Add a constructor to be able to specify different sizes. Reviewed By: yfeldblum Differential Revision: D6257017 fbshipit-source-id: c75f33c38fcdad646ba1499bcd434ab65711250c --- folly/executors/CPUThreadPoolExecutor.h | 8 ++++ .../task_queue/PriorityLifoSemMPMCQueue.h | 11 ++++++ .../test/PriorityLifoSemMPMCQueueTest.cpp | 38 +++++++++++++++++++ 3 files changed, 57 insertions(+) create mode 100644 folly/executors/task_queue/test/PriorityLifoSemMPMCQueueTest.cpp diff --git a/folly/executors/CPUThreadPoolExecutor.h b/folly/executors/CPUThreadPoolExecutor.h index 64f1b73c..153fc706 100644 --- a/folly/executors/CPUThreadPoolExecutor.h +++ b/folly/executors/CPUThreadPoolExecutor.h @@ -29,6 +29,14 @@ namespace folly { * the same queue. MPMC queue excels in this situation but dictates a max queue * size. * + * @note The default queue throws when full (folly::QueueBehaviorIfFull::THROW), + * so add() can fail. Furthermore, join() can also fail if the queue is full, + * because it enqueues numThreads poison tasks to stop the threads. If join() is + * needed to be guaranteed to succeed PriorityLifoSemMPMCQueue can be used + * instead, initializing the lowest priority's (LO_PRI) capacity to at least + * numThreads. Poisons use LO_PRI so if that priority is not used for any user + * task join() is guaranteed not to encounter a full queue. + * * @note If a blocking queue (folly::QueueBehaviorIfFull::BLOCK) is used, and * tasks executing on a given thread pool schedule more tasks, deadlock is * possible if the queue becomes full. Deadlock is also possible if there is diff --git a/folly/executors/task_queue/PriorityLifoSemMPMCQueue.h b/folly/executors/task_queue/PriorityLifoSemMPMCQueue.h index ab9d12ef..54d72f96 100644 --- a/folly/executors/task_queue/PriorityLifoSemMPMCQueue.h +++ b/folly/executors/task_queue/PriorityLifoSemMPMCQueue.h @@ -18,8 +18,10 @@ #include #include +#include #include #include +#include namespace folly { @@ -36,6 +38,15 @@ class PriorityLifoSemMPMCQueue : public BlockingQueue { } } + PriorityLifoSemMPMCQueue(folly::Range capacities) { + CHECK_LT(capacities.size(), 256) << "At most 255 priorities supported"; + + queues_.reserve(capacities.size()); + for (auto capacity : capacities) { + queues_.emplace_back(capacity); + } + } + uint8_t getNumPriorities() override { return queues_.size(); } diff --git a/folly/executors/task_queue/test/PriorityLifoSemMPMCQueueTest.cpp b/folly/executors/task_queue/test/PriorityLifoSemMPMCQueueTest.cpp new file mode 100644 index 00000000..ba8c7eb7 --- /dev/null +++ b/folly/executors/task_queue/test/PriorityLifoSemMPMCQueueTest.cpp @@ -0,0 +1,38 @@ +/* + * Copyright 2017-present Facebook, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include + +using namespace folly; + +TEST(PriorityLifoSemMPMCQueue, Capacities) { + const std::vector capacities = {1, 2, 3}; + PriorityLifoSemMPMCQueue q( + folly::range(capacities)); + + for (auto capacity : folly::enumerate(capacities)) { + auto pri = static_cast(capacity.index) - 1; + for (size_t i = 0; i < *capacity; ++i) { + EXPECT_NO_THROW(q.addWithPriority(0, pri)) << *capacity << " " << i; + } + EXPECT_THROW(q.addWithPriority(0, pri), QueueFullException) << *capacity; + } +} -- 2.34.1