* the same queue. MPMC queue excels in this situation but dictates a max queue
* size.
*
+ * @note The default queue throws when full (folly::QueueBehaviorIfFull::THROW),
+ * so add() can fail. Furthermore, join() can also fail if the queue is full,
+ * because it enqueues numThreads poison tasks to stop the threads. If join() is
+ * needed to be guaranteed to succeed PriorityLifoSemMPMCQueue can be used
+ * instead, initializing the lowest priority's (LO_PRI) capacity to at least
+ * numThreads. Poisons use LO_PRI so if that priority is not used for any user
+ * task join() is guaranteed not to encounter a full queue.
+ *
* @note If a blocking queue (folly::QueueBehaviorIfFull::BLOCK) is used, and
* tasks executing on a given thread pool schedule more tasks, deadlock is
* possible if the queue becomes full. Deadlock is also possible if there is
#include <folly/Executor.h>
#include <folly/MPMCQueue.h>
+#include <folly/Range.h>
#include <folly/executors/task_queue/BlockingQueue.h>
#include <folly/synchronization/LifoSem.h>
+#include <glog/logging.h>
namespace folly {
}
}
+ PriorityLifoSemMPMCQueue(folly::Range<const size_t*> capacities) {
+ CHECK_LT(capacities.size(), 256) << "At most 255 priorities supported";
+
+ queues_.reserve(capacities.size());
+ for (auto capacity : capacities) {
+ queues_.emplace_back(capacity);
+ }
+ }
+
uint8_t getNumPriorities() override {
return queues_.size();
}
--- /dev/null
+/*
+ * Copyright 2017-present Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <vector>
+
+#include <folly/Range.h>
+#include <folly/container/Enumerate.h>
+#include <folly/executors/task_queue/PriorityLifoSemMPMCQueue.h>
+#include <folly/portability/GTest.h>
+
+using namespace folly;
+
+TEST(PriorityLifoSemMPMCQueue, Capacities) {
+ const std::vector<size_t> capacities = {1, 2, 3};
+ PriorityLifoSemMPMCQueue<int, QueueBehaviorIfFull::THROW> q(
+ folly::range(capacities));
+
+ for (auto capacity : folly::enumerate(capacities)) {
+ auto pri = static_cast<int8_t>(capacity.index) - 1;
+ for (size_t i = 0; i < *capacity; ++i) {
+ EXPECT_NO_THROW(q.addWithPriority(0, pri)) << *capacity << " " << i;
+ }
+ EXPECT_THROW(q.addWithPriority(0, pri), QueueFullException) << *capacity;
+ }
+}