Make some AsyncTest methods virtual to allow mocking them using gtest/gmock
[folly.git] / folly / PriorityMPMCQueue.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16 #pragma once
17
18 #include <glog/logging.h>
19 #include <algorithm>
20 #include <vector>
21
22 #include <folly/MPMCQueue.h>
23
24 namespace folly {
25
26 /// PriorityMPMCQueue is a thin wrapper on MPMCQueue, providing priorities
27 /// by managing multiple underlying MPMCQueues. As of now, this does
28 /// not implement a blocking interface. For the purposes of this
29 /// class, lower number is higher priority
30
31 template <class T>
32 class PriorityMPMCQueue {
33  public:
34   PriorityMPMCQueue(size_t numPriorities, size_t capacity) {
35     CHECK_GT(numPriorities, 0);
36     queues_.reserve(numPriorities);
37     for (size_t i = 0; i < numPriorities; i++) {
38       queues_.emplace_back(capacity);
39     }
40   }
41
42   size_t getNumPriorities() {
43     return queues_.size();
44   }
45
46   // Add at medium priority by default
47   bool write(T&& item) {
48     return writeWithPriority(std::move(item), getNumPriorities() / 2);
49   }
50
51   bool writeWithPriority(T&& item, size_t priority) {
52     size_t queue = std::min(getNumPriorities() - 1, priority);
53     CHECK_LT(queue, queues_.size());
54     return queues_.at(queue).write(std::move(item));
55   }
56
57   bool read(T& item) {
58     for (auto& q : queues_) {
59       if (q.readIfNotEmpty(item)) {
60         return true;
61       }
62     }
63     return false;
64   }
65
66   size_t size() const {
67     size_t total_size = 0;
68     for (auto& q : queues_) {
69       // MPMCQueue can have a negative size if there are pending readers.
70       // Since we don't expose a blocking interface this shouldn't happen,
71       // But just in case we put a floor at 0
72       total_size += std::max<ssize_t>(0, q.size());
73     }
74     return total_size;
75   }
76
77   size_t sizeGuess() const {
78     size_t total_size = 0;
79     for (auto& q : queues_) {
80       // MPMCQueue can have a negative size if there are pending readers.
81       // Since we don't expose a blocking interface this shouldn't happen,
82       // But just in case we put a floor at 0
83       total_size += std::max<ssize_t>(0, q.sizeGuess());
84     }
85     return total_size;
86   }
87
88   /// Returns true if there are no items available for dequeue
89   bool isEmpty() const {
90     return size() == 0;
91   }
92
93  private:
94   std::vector<folly::MPMCQueue<T>> queues_;
95 };
96
97 } // namespace folly