Merge pull request #36 from khegeman/integration
[libcds.git] / cds / intrusive / fcqueue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H
4 #define CDSLIB_INTRUSIVE_FCQUEUE_H
5
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
8 #include <cds/intrusive/options.h>
9 #include <boost/intrusive/list.hpp>
10
11 namespace cds { namespace intrusive {
12
13     /// FCQueue related definitions
14     namespace fcqueue {
15
16         /// FCQueue internal statistics
17         template <typename Counter = cds::atomicity::event_counter >
18         struct stat: public cds::algo::flat_combining::stat<Counter>
19         {
20             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
21             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
22
23             counter_type    m_nEnqueue     ;   ///< Count of push operations
24             counter_type    m_nDequeue     ;   ///< Count of success pop operations
25             counter_type    m_nFailedDeq   ;   ///< Count of failed pop operations (pop from empty queue)
26             counter_type    m_nCollided    ;   ///< How many pairs of push/pop were collided, if elimination is enabled
27
28             //@cond
29             void    onEnqueue()                 { ++m_nEnqueue; }
30             void    onDequeue( bool bFailed )   { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue;  }
31             void    onCollide()                 { ++m_nCollided; }
32             //@endcond
33         };
34
35         /// FCQueue dummy statistics, no overhead
36         struct empty_stat: public cds::algo::flat_combining::empty_stat
37         {
38             //@cond
39             void    onEnqueue()     {}
40             void    onDequeue(bool) {}
41             void    onCollide()     {}
42             //@endcond
43         };
44
45         /// FCQueue type traits
46         struct traits: public cds::algo::flat_combining::traits
47         {
48             typedef cds::intrusive::opt::v::empty_disposer  disposer ; ///< Disposer to erase removed elements. Used only in \p FCQueue::clear() function
49             typedef empty_stat      stat;   ///< Internal statistics
50             static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
51         };
52
53         /// Metafunction converting option list to traits
54         /**
55             \p Options are:
56             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
57             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default
58             - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::intrusive::v::empty_disposer.
59                 This option is used only in \p FCQueue::clear() function.
60             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
61             - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
62             - \p opt::memory_model - C++ memory ordering model.
63                 List of all available memory ordering see \p opt::memory_model.
64                 Default is \p cds::opt::v:relaxed_ordering
65             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
66                 By default, the elimination is disabled.
67         */
68         template <typename... Options>
69         struct make_traits {
70 #   ifdef CDS_DOXYGEN_INVOKED
71             typedef implementation_defined type ;   ///< Metafunction result
72 #   else
73             typedef typename cds::opt::make_options<
74                 typename cds::opt::find_type_traits< traits, Options... >::type
75                 ,Options...
76             >::type   type;
77 #   endif
78         };
79     } // namespace fcqueue
80
81     /// Flat-combining intrusive queue
82     /**
83         @ingroup cds_intrusive_queue
84         @ingroup cds_flat_combining_intrusive
85
86         \ref cds_flat_combining_description "Flat combining" sequential intrusive queue.
87
88         Template parameters:
89         - \p T - a value type stored in the queue
90         - \p Container - sequential intrusive container with \p push_back and \p pop_front functions.
91             Default is \p boost::intrusive::list
92         - \p Traits - type traits of flat combining, default is \p fcqueue::traits.
93             \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization
94     */
95     template <typename T
96         ,class Container = boost::intrusive::list<T>
97         ,typename Traits = fcqueue::traits
98     >
99     class FCQueue
100 #ifndef CDS_DOXYGEN_INVOKED
101         : public cds::algo::flat_combining::container
102 #endif
103     {
104     public:
105         typedef T           value_type;     ///< Value type
106         typedef Container   container_type; ///< Sequential container type
107         typedef Traits      traits;         ///< Queue traits
108
109         typedef typename traits::disposer   disposer;   ///< The disposer functor. The disposer is used only in \ref clear() function
110         typedef typename traits::stat       stat;   ///< Internal statistics type
111         static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
112
113     protected:
114         //@cond
115         /// Queue operation IDs
116         enum fc_operation {
117             op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
118             op_deq,                 ///< Dequeue
119             op_clear,               ///< Clear
120             op_clear_and_dispose    ///< Clear and dispose
121         };
122
123         /// Flat combining publication list record
124         struct fc_record: public cds::algo::flat_combining::publication_record
125         {
126             value_type * pVal;  ///< Value to enqueue or dequeue
127             bool         bEmpty; ///< \p true if the queue is empty
128         };
129         //@endcond
130
131         /// Flat combining kernel
132         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
133
134     protected:
135         //@cond
136         fc_kernel       m_FlatCombining;
137         container_type  m_Queue;
138         //@endcond
139
140     public:
141         /// Initializes empty queue object
142         FCQueue()
143         {}
144
145         /// Initializes empty queue object and gives flat combining parameters
146         FCQueue(
147             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
148             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
149             )
150             : m_FlatCombining( nCompactFactor, nCombinePassCount )
151         {}
152
153         /// Inserts a new element at the end of the queue
154         /**
155             The function always returns \p true.
156         */
157         bool enqueue( value_type& val )
158         {
159             fc_record * pRec = m_FlatCombining.acquire_record();
160             pRec->pVal = &val;
161
162             if ( c_bEliminationEnabled )
163                 m_FlatCombining.batch_combine( op_enq, pRec, *this );
164             else
165                 m_FlatCombining.combine( op_enq, pRec, *this );
166
167             assert( pRec->is_done() );
168             m_FlatCombining.release_record( pRec );
169             m_FlatCombining.internal_statistics().onEnqueue();
170             return true;
171         }
172
173         /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
174         bool push( value_type& val )
175         {
176             return enqueue( val );
177         }
178
179         /// Removes the next element from the queue
180         /**
181             If the queue is empty the function returns \p nullptr
182         */
183         value_type * dequeue()
184         {
185             fc_record * pRec = m_FlatCombining.acquire_record();
186             pRec->pVal = nullptr;
187
188             if ( c_bEliminationEnabled )
189                 m_FlatCombining.batch_combine( op_deq, pRec, *this );
190             else
191                 m_FlatCombining.combine( op_deq, pRec, *this );
192
193             assert( pRec->is_done() );
194             m_FlatCombining.release_record( pRec );
195
196             m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
197             return pRec->pVal;
198         }
199
200         /// Removes the next element from the queue (a synonym for \ref dequeue)
201         value_type * pop()
202         {
203             return dequeue();
204         }
205
206         /// Clears the queue
207         /**
208             If \p bDispose is \p true, the disposer provided in \p Traits class' template parameter
209             will be called for each removed element.
210         */
211         void clear( bool bDispose = false )
212         {
213             fc_record * pRec = m_FlatCombining.acquire_record();
214
215             if ( c_bEliminationEnabled )
216                 m_FlatCombining.batch_combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
217             else
218                 m_FlatCombining.combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
219
220             assert( pRec->is_done() );
221             m_FlatCombining.release_record( pRec );
222         }
223
224         /// Returns the number of elements in the queue.
225         /**
226             Note that <tt>size() == 0</tt> is not mean that the queue is empty because
227             combining record can be in process.
228             To check emptiness use \ref empty function.
229         */
230         size_t size() const
231         {
232             return m_Queue.size();
233         }
234
235         /// Checks if the queue is empty
236         /**
237             If the combining is in process the function waits while it is done.
238         */
239         bool empty() const
240         {
241             m_FlatCombining.wait_while_combining();
242             return m_Queue.empty();
243         }
244
245         /// Internal statistics
246         stat const& statistics() const
247         {
248             return m_FlatCombining.statistics();
249         }
250
251     public: // flat combining cooperation, not for direct use!
252         //@cond
253         /// Flat combining supporting function. Do not call it directly!
254         /**
255             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
256             object if the current thread becomes a combiner. Invocation of the function means that
257             the queue should perform an action recorded in \p pRec.
258         */
259         void fc_apply( fc_record * pRec )
260         {
261             assert( pRec );
262
263             switch ( pRec->op() ) {
264             case op_enq:
265                 assert( pRec->pVal );
266                 m_Queue.push_back( *(pRec->pVal ) );
267                 break;
268             case op_deq:
269                 pRec->bEmpty = m_Queue.empty();
270                 if ( !pRec->bEmpty ) {
271                     pRec->pVal = &m_Queue.front();
272                     m_Queue.pop_front();
273                 }
274                 break;
275             case op_clear:
276                 m_Queue.clear();
277                 break;
278             case op_clear_and_dispose:
279                 m_Queue.clear_and_dispose( disposer() );
280                 break;
281             default:
282                 assert(false);
283                 break;
284             }
285         }
286
287         /// Batch-processing flat combining
288         void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
289         {
290             typedef typename fc_kernel::iterator fc_iterator;
291             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
292                 switch ( it->op() ) {
293                 case op_enq:
294                 case op_deq:
295                     if ( m_Queue.empty() ) {
296                         if ( itPrev != itEnd && collide( *itPrev, *it ))
297                             itPrev = itEnd;
298                         else
299                             itPrev = it;
300                     }
301                     break;
302                 }
303             }
304         }
305         //@endcond
306
307     private:
308         //@cond
309         bool collide( fc_record& rec1, fc_record& rec2 )
310         {
311             assert( m_Queue.empty() );
312
313             switch ( rec1.op() ) {
314                 case op_enq:
315                     if ( rec2.op() == op_deq ) {
316                         assert(rec1.pVal);
317                         rec2.pVal = rec1.pVal;
318                         rec2.bEmpty = false;
319                         m_FlatCombining.operation_done( rec1 );
320                         m_FlatCombining.operation_done( rec2 );
321                         m_FlatCombining.internal_statistics().onCollide();
322                         return true;
323                     }
324                     break;
325                 case op_deq:
326                     if ( rec2.op() == op_enq ) {
327                         assert(rec2.pVal);
328                         rec1.pVal = rec2.pVal;
329                         rec1.bEmpty = false;
330                         m_FlatCombining.operation_done( rec1 );
331                         m_FlatCombining.operation_done( rec2 );
332                         m_FlatCombining.internal_statistics().onCollide();
333                         return true;
334                     }
335                     break;
336             }
337             return false;
338         }
339         //@endcond
340     };
341
342 }} // namespace cds::intrusive
343
344 #endif // #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H