2017
[folly.git] / folly / detail / MPMCPipelineDetail.h
1 /*
2  * Copyright 2017 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/MPMCQueue.h>
20
21 namespace folly {
22
23 template <class T, class... Stages> class MPMCPipeline;
24
25 template <class T, size_t Amp> class MPMCPipelineStage {
26  public:
27   typedef T value_type;
28   static constexpr size_t kAmplification = Amp;
29 };
30
31 namespace detail {
32
33 /**
34  * Helper template to determine value type and amplification whether or not
35  * we use MPMCPipelineStage<>
36  */
37 template <class T> struct PipelineStageInfo {
38   static constexpr size_t kAmplification = 1;
39   typedef T value_type;
40 };
41
42 template <class T, size_t Amp>
43 struct PipelineStageInfo<MPMCPipelineStage<T, Amp>> {
44   static constexpr size_t kAmplification = Amp;
45   typedef T value_type;
46 };
47
48 /**
49  * Wrapper around MPMCQueue (friend) that keeps track of tickets.
50  */
51 template <class T>
52 class MPMCPipelineStageImpl {
53  public:
54   typedef T value_type;
55   template <class U, class... Stages> friend class MPMCPipeline;
56
57   // Implicit so that MPMCPipeline construction works
58   /* implicit */ MPMCPipelineStageImpl(size_t capacity) : queue_(capacity) { }
59   MPMCPipelineStageImpl() { }
60
61   // only use on first stage, uses queue_.pushTicket_ instead of existing
62   // ticket
63   template <class... Args>
64   void blockingWrite(Args&&... args) noexcept {
65     queue_.blockingWrite(std::forward<Args>(args)...);
66   }
67
68   template <class... Args>
69   bool write(Args&&... args) noexcept {
70     return queue_.write(std::forward<Args>(args)...);
71   }
72
73   template <class... Args>
74   void blockingWriteWithTicket(uint64_t ticket, Args&&... args) noexcept {
75     queue_.enqueueWithTicket(ticket, std::forward<Args>(args)...);
76   }
77
78   uint64_t blockingRead(T& elem) noexcept {
79     uint64_t ticket;
80     queue_.blockingReadWithTicket(ticket, elem);
81     return ticket;
82   }
83
84   bool read(T& elem) noexcept {  // only use on last stage, won't track ticket
85     return queue_.read(elem);
86   }
87
88   template <class... Args>
89   bool readAndGetTicket(uint64_t& ticket, T& elem) noexcept {
90     return queue_.readAndGetTicket(ticket, elem);
91   }
92
93   // See MPMCQueue<T>::writeCount; only works for the first stage
94   uint64_t writeCount() const noexcept {
95     return queue_.writeCount();
96   }
97
98   uint64_t readCount() const noexcept {
99     return queue_.readCount();
100   }
101
102  private:
103   MPMCQueue<T> queue_;
104 };
105
106 // Product of amplifications of a tuple of PipelineStageInfo<X>
107 template <class Tuple> struct AmplificationProduct;
108
109 template <> struct AmplificationProduct<std::tuple<>> {
110   static constexpr size_t value = 1;
111 };
112
113 template <class T, class... Ts>
114 struct AmplificationProduct<std::tuple<T, Ts...>> {
115   static constexpr size_t value =
116     T::kAmplification *
117     AmplificationProduct<std::tuple<Ts...>>::value;
118 };
119
120 }}  // namespaces