797287c0c4ae6082db5b9d2e243d898b2b723798
[folly.git] / folly / executors / PriorityLifoSemMPMCQueue.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/Executor.h>
20 #include <folly/LifoSem.h>
21 #include <folly/MPMCQueue.h>
22 #include <folly/executors/BlockingQueue.h>
23
24 namespace folly {
25
26 template <class T, QueueBehaviorIfFull kBehavior = QueueBehaviorIfFull::THROW>
27 class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
28  public:
29   // Note A: The queue pre-allocates all memory for max_capacity
30   // Note B: To use folly::Executor::*_PRI, for numPriorities == 2
31   //         MID_PRI and HI_PRI are treated at the same priority level.
32   PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t max_capacity) {
33     queues_.reserve(numPriorities);
34     for (int8_t i = 0; i < numPriorities; i++) {
35       queues_.emplace_back(max_capacity);
36     }
37   }
38
39   uint8_t getNumPriorities() override {
40     return queues_.size();
41   }
42
43   // Add at medium priority by default
44   void add(T item) override {
45     addWithPriority(std::move(item), folly::Executor::MID_PRI);
46   }
47
48   void addWithPriority(T item, int8_t priority) override {
49     int mid = getNumPriorities() / 2;
50     size_t queue = priority < 0
51         ? std::max(0, mid + priority)
52         : std::min(getNumPriorities() - 1, mid + priority);
53     CHECK_LT(queue, queues_.size());
54     switch (kBehavior) { // static
55       case QueueBehaviorIfFull::THROW:
56         if (!queues_[queue].write(std::move(item))) {
57           throw QueueFullException("LifoSemMPMCQueue full, can't add item");
58         }
59         break;
60       case QueueBehaviorIfFull::BLOCK:
61         queues_[queue].blockingWrite(std::move(item));
62         break;
63     }
64     sem_.post();
65   }
66
67   T take() override {
68     T item;
69     while (true) {
70       if (nonBlockingTake(item)) {
71         return item;
72       }
73       sem_.wait();
74     }
75   }
76
77   bool nonBlockingTake(T& item) {
78     for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
79       if (it->readIfNotEmpty(item)) {
80         return true;
81       }
82     }
83     return false;
84   }
85
86   size_t size() override {
87     size_t size = 0;
88     for (auto& q : queues_) {
89       size += q.size();
90     }
91     return size;
92   }
93
94   size_t sizeGuess() const {
95     size_t size = 0;
96     for (auto& q : queues_) {
97       size += q.sizeGuess();
98     }
99     return size;
100   }
101
102  private:
103   folly::LifoSem sem_;
104   std::vector<folly::MPMCQueue<T>> queues_;
105 };
106
107 } // namespace folly