Allow to specify per-priority capacities in PriorityLifoSemMPMCQueue
[folly.git] / folly / executors / task_queue / PriorityLifoSemMPMCQueue.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Executor.h>
20 #include <folly/MPMCQueue.h>
21 #include <folly/Range.h>
22 #include <folly/executors/task_queue/BlockingQueue.h>
23 #include <folly/synchronization/LifoSem.h>
24 #include <glog/logging.h>
25
26 namespace folly {
27
28 template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
29 class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
30  public:
31   // Note A: The queue pre-allocates all memory for max_capacity
32   // Note B: To use folly::Executor::*_PRI, for numPriorities == 2
33   //         MID_PRI and HI_PRI are treated at the same priority level.
34   PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) {
35     queues_.reserve(numPriorities);
36     for (int8_t i = 0; i < numPriorities; i++) {
37       queues_.emplace_back(max_capacity);
38     }
39   }
40
41   PriorityLifoSemMPMCQueue(folly::Range<const size_t*> capacities) {
42     CHECK_LT(capacities.size(), 256) << "At most 255 priorities supported";
43
44     queues_.reserve(capacities.size());
45     for (auto capacity : capacities) {
46       queues_.emplace_back(capacity);
47     }
48   }
49
50   uint8_t getNumPriorities() override {
51     return queues_.size();
52   }
53
54   // Add at medium priority by default
55   void add(T item) override {
56     addWithPriority(std::move(item), folly::Executor::MID_PRI);
57   }
58
59   void addWithPriority(T item, int8_t priority) override {
60     int mid = getNumPriorities() / 2;
61     size_t queue = priority < 0
62         ? std::max(0, mid + priority)
63         : std::min(getNumPriorities() - 1, mid + priority);
64     CHECK_LT(queue, queues_.size());
65     switch (kBehavior) { // static
66       case QueueBehaviorIfFull::THROW:
67         if (!queues_[queue].write(std::move(item))) {
68           throw QueueFullException("LifoSemMPMCQueue full, can't add item");
69         }
70         break;
71       case QueueBehaviorIfFull::BLOCK:
72         queues_[queue].blockingWrite(std::move(item));
73         break;
74     }
75     sem_.post();
76   }
77
78   T take() override {
79     T item;
80     while (true) {
81       if (nonBlockingTake(item)) {
82         return item;
83       }
84       sem_.wait();
85     }
86   }
87
88   bool nonBlockingTake(T& item) {
89     for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
90       if (it->readIfNotEmpty(item)) {
91         return true;
92       }
93     }
94     return false;
95   }
96
97   size_t size() override {
98     size_t size = 0;
99     for (auto& q : queues_) {
100       size += q.size();
101     }
102     return size;
103   }
104
105   size_t sizeGuess() const {
106     size_t size = 0;
107     for (auto& q : queues_) {
108       size += q.sizeGuess();
109     }
110     return size;
111   }
112
113  private:
114   folly::LifoSem sem_;
115   std::vector<folly::MPMCQueue<T>> queues_;
116 };
117
118 } // namespace folly