2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
18 #include <folly/wangle/concurrent/BlockingQueue.h>
19 #include <folly/LifoSem.h>
20 #include <folly/MPMCQueue.h>
22 namespace folly { namespace wangle {
25 class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
27 explicit PriorityLifoSemMPMCQueue(uint32_t numPriorities, size_t capacity) {
28 CHECK(numPriorities > 0);
29 queues_.reserve(numPriorities);
30 for (uint32_t i = 0; i < numPriorities; i++) {
31 queues_.push_back(MPMCQueue<T>(capacity));
35 uint32_t getNumPriorities() override {
36 return queues_.size();
39 // Add at lowest priority by default
40 void add(T item) override {
41 addWithPriority(std::move(item), 0);
44 void addWithPriority(T item, uint32_t priority) override {
45 CHECK(priority < queues_.size());
46 if (!queues_[priority].write(std::move(item))) {
47 throw std::runtime_error("LifoSemMPMCQueue full, can't add item");
55 for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
64 size_t size() override {
66 for (auto& q : queues_) {
74 std::vector<MPMCQueue<T>> queues_;