Copyright 2014->2015
[folly.git] / folly / wangle / concurrent / PriorityLifoSemMPMCQueue.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18 #include <folly/wangle/concurrent/BlockingQueue.h>
19 #include <folly/LifoSem.h>
20 #include <folly/MPMCQueue.h>
21
22 namespace folly { namespace wangle {
23
24 template <class T>
25 class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
26  public:
27   explicit PriorityLifoSemMPMCQueue(uint32_t numPriorities, size_t capacity) {
28     CHECK(numPriorities > 0);
29     queues_.reserve(numPriorities);
30     for (uint32_t i = 0; i < numPriorities; i++) {
31       queues_.push_back(MPMCQueue<T>(capacity));
32     }
33   }
34
35   uint32_t getNumPriorities() override {
36     return queues_.size();
37   }
38
39   // Add at lowest priority by default
40   void add(T item) override {
41     addWithPriority(std::move(item), 0);
42   }
43
44   void addWithPriority(T item, uint32_t priority) override {
45     CHECK(priority < queues_.size());
46     if (!queues_[priority].write(std::move(item))) {
47       throw std::runtime_error("LifoSemMPMCQueue full, can't add item");
48     }
49     sem_.post();
50   }
51
52   T take() override {
53     T item;
54     while (true) {
55       for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
56         if (it->read(item)) {
57           return item;
58         }
59       }
60       sem_.wait();
61     }
62   }
63
64   size_t size() override {
65     size_t size = 0;
66     for (auto& q : queues_) {
67       size += q.size();
68     }
69     return size;
70   }
71
72  private:
73   LifoSem sem_;
74   std::vector<MPMCQueue<T>> queues_;
75 };
76
77 }} // folly::wangle