removing non-existing file from the build
[folly.git] / folly / wangle / concurrent / PriorityLifoSemMPMCQueue.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18 #include <folly/wangle/concurrent/BlockingQueue.h>
19 #include <folly/LifoSem.h>
20 #include <folly/MPMCQueue.h>
21
22 namespace folly { namespace wangle {
23
24 template <class T>
25 class PriorityLifoSemMPMCQueue : public BlockingQueue<T> {
26  public:
27   explicit PriorityLifoSemMPMCQueue(uint8_t numPriorities, size_t capacity) {
28     queues_.reserve(numPriorities);
29     for (int8_t i = 0; i < numPriorities; i++) {
30       queues_.push_back(MPMCQueue<T>(capacity));
31     }
32   }
33
34   uint8_t getNumPriorities() override {
35     return queues_.size();
36   }
37
38   // Add at medium priority by default
39   void add(T item) override {
40     addWithPriority(std::move(item), Executor::MID_PRI);
41   }
42
43   void addWithPriority(T item, int8_t priority) override {
44     int mid = getNumPriorities() / 2;
45     size_t queue = priority < 0 ?
46                    std::max(0, mid + priority) :
47                    std::min(getNumPriorities() - 1, mid + priority);
48     CHECK(queue < queues_.size());
49     if (!queues_[queue].write(std::move(item))) {
50       throw std::runtime_error("LifoSemMPMCQueue full, can't add item");
51     }
52     sem_.post();
53   }
54
55   T take() override {
56     T item;
57     while (true) {
58       for (auto it = queues_.rbegin(); it != queues_.rend(); it++) {
59         if (it->read(item)) {
60           return item;
61         }
62       }
63       sem_.wait();
64     }
65   }
66
67   size_t size() override {
68     size_t size = 0;
69     for (auto& q : queues_) {
70       size += q.size();
71     }
72     return size;
73   }
74
75  private:
76   LifoSem sem_;
77   std::vector<MPMCQueue<T>> queues_;
78 };
79
80 }} // folly::wangle