2 * Copyright 2017 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/Executor.h>
20 #include <folly/MPMCQueue.h>
21 #include <folly/Range.h>
22 #include <folly/executors/task_queue/BlockingQueue.h>
23 #include <folly/synchronization/LifoSem.h>
24 #include <glog/logging.h>
28 template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
29 class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
31 // Note A: The queue pre-allocates all memory for max_capacity
32 // Note B: To use folly::Executor::*_PRI, for numPriorities == 2
33 // MID_PRI and HI_PRI are treated at the same priority level.
34 PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) {
35 queues_.reserve(numPriorities);
36 for (int8_t i = 0; i < numPriorities; i++) {
37 queues_.emplace_back(max_capacity);
41 PriorityLifoSemMPMCQueue(folly::Range<const size_t*> capacities) {
42 CHECK_LT(capacities.size(), 256) << "At most 255 priorities supported";
44 queues_.reserve(capacities.size());
45 for (auto capacity : capacities) {
46 queues_.emplace_back(capacity);
50 uint8_t getNumPriorities() override {
51 return queues_.size();
54 // Add at medium priority by default
55 void add(T item) override {
56 addWithPriority(std::move(item), folly::Executor::MID_PRI);
59 void addWithPriority(T item, int8_t priority) override {
60 int mid = getNumPriorities() / 2;
61 size_t queue = priority < 0
62 ? std::max(0, mid + priority)
63 : std::min(getNumPriorities() - 1, mid + priority);
64 CHECK_LT(queue, queues_.size());
65 switch (kBehavior) { // static
66 case QueueBehaviorIfFull::THROW:
67 if (!queues_[queue].write(std::move(item))) {
68 throw QueueFullException("LifoSemMPMCQueue full, can't add item");
71 case QueueBehaviorIfFull::BLOCK:
72 queues_[queue].blockingWrite(std::move(item));
81 if (nonBlockingTake(item)) {
88 bool nonBlockingTake(T& item) {
89 for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
90 if (it->readIfNotEmpty(item)) {
97 size_t size() override {
99 for (auto& q : queues_) {
105 size_t sizeGuess() const {
107 for (auto& q : queues_) {
108 size += q.sizeGuess();
115 std::vector<folly::MPMCQueue<T>> queues_;