2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/Executor.h>
20 #include <folly/LifoSem.h>
21 #include <folly/MPMCQueue.h>
22 #include <folly/executors/task_queue/BlockingQueue.h>
26 template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
27 class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
29 // Note A: The queue pre-allocates all memory for max_capacity
30 // Note B: To use folly::Executor::*_PRI, for numPriorities == 2
31 // MID_PRI and HI_PRI are treated at the same priority level.
32 PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) {
33 queues_.reserve(numPriorities);
34 for (int8_t i = 0; i < numPriorities; i++) {
35 queues_.emplace_back(max_capacity);
39 uint8_t getNumPriorities() override {
40 return queues_.size();
43 // Add at medium priority by default
44 void add(T item) override {
45 addWithPriority(std::move(item), folly::Executor::MID_PRI);
48 void addWithPriority(T item, int8_t priority) override {
49 int mid = getNumPriorities() / 2;
50 size_t queue = priority < 0
51 ? std::max(0, mid + priority)
52 : std::min(getNumPriorities() - 1, mid + priority);
53 CHECK_LT(queue, queues_.size());
54 switch (kBehavior) { // static
55 case QueueBehaviorIfFull::THROW:
56 if (!queues_[queue].write(std::move(item))) {
57 throw QueueFullException("LifoSemMPMCQueue full, can't add item");
60 case QueueBehaviorIfFull::BLOCK:
61 queues_[queue].blockingWrite(std::move(item));
70 if (nonBlockingTake(item)) {
77 bool nonBlockingTake(T& item) {
78 for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
79 if (it->readIfNotEmpty(item)) {
86 size_t size() override {
88 for (auto& q : queues_) {
94 size_t sizeGuess() const {
96 for (auto& q : queues_) {
97 size += q.sizeGuess();
104 std::vector<folly::MPMCQueue<T>> queues_;