Copyright 2014->2015
[folly.git] / folly / detail / MPMCPipelineDetail.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/MPMCQueue.h>
20
21 namespace folly {
22
23 template <class T, class... Stages> class MPMCPipeline;
24
25 template <class T, size_t Amp> class MPMCPipelineStage {
26  public:
27   typedef T value_type;
28   static constexpr size_t kAmplification = Amp;
29 };
30
31 namespace detail {
32
33 /**
34  * Helper template to determine value type and amplification whether or not
35  * we use MPMCPipelineStage<>
36  */
37 template <class T> struct PipelineStageInfo {
38   static constexpr size_t kAmplification = 1;
39   typedef T value_type;
40 };
41
42 template <class T, size_t Amp>
43 struct PipelineStageInfo<MPMCPipelineStage<T, Amp>> {
44   static constexpr size_t kAmplification = Amp;
45   typedef T value_type;
46 };
47
48 /**
49  * Wrapper around MPMCQueue (friend) that keeps track of tickets.
50  */
51 template <class T>
52 class MPMCPipelineStageImpl {
53  public:
54   typedef T value_type;
55   template <class U, class... Stages> friend class MPMCPipeline;
56
57   // Implicit so that MPMCPipeline construction works
58   /* implicit */ MPMCPipelineStageImpl(size_t capacity) : queue_(capacity) { }
59   MPMCPipelineStageImpl() { }
60
61   // only use on first stage, uses queue_.pushTicket_ instead of existing
62   // ticket
63   template <class... Args>
64   void blockingWrite(Args&&... args) noexcept {
65     queue_.blockingWrite(std::forward<Args>(args)...);
66   }
67
68   template <class... Args>
69   bool write(Args&&... args) noexcept {
70     return queue_.write(std::forward<Args>(args)...);
71   }
72
73   template <class... Args>
74   void blockingWriteWithTicket(uint64_t ticket, Args&&... args) noexcept {
75     queue_.enqueueWithTicket(ticket, std::forward<Args>(args)...);
76   }
77
78   uint64_t blockingRead(T& elem) noexcept {
79     uint64_t ticket = queue_.popTicket_++;
80     queue_.dequeueWithTicket(ticket, elem);
81     return ticket;
82   }
83
84   bool read(T& elem) noexcept {  // only use on last stage, won't track ticket
85     return queue_.read(elem);
86   }
87
88   template <class... Args>
89   bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
90     if (queue_.tryObtainReadyPopTicket(ticket)) {
91       queue_.dequeueWithTicket(ticket, elem);
92       return true;
93     } else {
94       return false;
95     }
96   }
97
98   // See MPMCQueue<T>::writeCount; only works for the first stage
99   uint64_t writeCount() const noexcept {
100     return queue_.writeCount();
101   }
102
103   uint64_t readCount() const noexcept {
104     return queue_.readCount();
105   }
106
107  private:
108   MPMCQueue<T> queue_;
109 };
110
111 // Product of amplifications of a tuple of PipelineStageInfo<X>
112 template <class Tuple> struct AmplificationProduct;
113
114 template <> struct AmplificationProduct<std::tuple<>> {
115   static constexpr size_t value = 1;
116 };
117
118 template <class T, class... Ts>
119 struct AmplificationProduct<std::tuple<T, Ts...>> {
120   static constexpr size_t value =
121     T::kAmplification *
122     AmplificationProduct<std::tuple<Ts...>>::value;
123 };
124
125 }}  // namespaces