rename type_traits to traits in flat_combining namespace
[libcds.git] / cds / container / fcqueue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_CONTAINER_FCQUEUE_H
4 #define __CDS_CONTAINER_FCQUEUE_H
5
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
8 #include <queue>
9
10 namespace cds { namespace container {
11
12     /// FCQueue related definitions
13     /** @ingroup cds_nonintrusive_helper
14     */
15     namespace fcqueue {
16
17         /// FCQueue internal statistics
18         template <typename Counter = cds::atomicity::event_counter >
19         struct stat: public cds::algo::flat_combining::stat<Counter>
20         {
21             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
22             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
23
24             counter_type    m_nEnqueue     ;   ///< Count of enqueue operations
25             counter_type    m_nEnqMove     ;   ///< Count of enqueue operations with move semantics
26             counter_type    m_nDequeue     ;   ///< Count of success dequeue operations
27             counter_type    m_nFailedDeq   ;   ///< Count of failed dequeue operations (pop from empty queue)
28             counter_type    m_nCollided    ;   ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
29
30             //@cond
31             void    onEnqueue()               { ++m_nEnqueue; }
32             void    onEnqMove()               { ++m_nEnqMove; }
33             void    onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue;  }
34             void    onCollide()               { ++m_nCollided; }
35             //@endcond
36         };
37
38         /// FCQueue dummy statistics, no overhead
39         struct empty_stat: public cds::algo::flat_combining::empty_stat
40         {
41             //@cond
42             void    onEnqueue()     {}
43             void    onEnqMove()     {}
44             void    onDequeue(bool) {}
45             void    onCollide()     {}
46             //@endcond
47         };
48
49         /// FCQueue type traits
50         struct traits: public cds::algo::flat_combining::traits
51         {
52             typedef empty_stat      stat;   ///< Internal statistics
53             static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
54         };
55
56         /// Metafunction converting option list to traits
57         /**
58             \p Options are:
59             - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
60             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
61             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
62             - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
63             - \p opt::memory_model - C++ memory ordering model.
64                 List of all available memory ordering see \p opt::memory_model.
65                 Default is \p cds::opt::v:relaxed_ordering
66             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
67                 By default, the elimination is disabled. For queue, the elimination is possible if the queue
68                 is empty.
69         */
70         template <typename... Options>
71         struct make_traits {
72 #   ifdef CDS_DOXYGEN_INVOKED
73             typedef implementation_defined type ;   ///< Metafunction result
74 #   else
75             typedef typename cds::opt::make_options<
76                 typename cds::opt::find_type_traits< traits, Options... >::type
77                 ,Options...
78             >::type   type;
79 #   endif
80         };
81
82     } // namespace fcqueue
83
84     /// Flat-combining queue
85     /**
86         @ingroup cds_nonintrusive_queue
87         @ingroup cds_flat_combining_container
88
89         \ref cds_flat_combining_description "Flat combining" sequential queue.
90         The class can be considered as a concurrent FC-based wrapper for \p std::queue.
91
92         Template parameters:
93         - \p T - a value type stored in the queue
94         - \p Queue - sequential queue implementation, default is \p std::queue<T>
95         - \p Trats - type traits of flat combining, default is \p fcqueue::traits.
96             \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization.
97     */
98     template <typename T,
99         class Queue = std::queue<T>,
100         typename Traits = fcqueue::traits
101     >
102     class FCQueue
103 #ifndef CDS_DOXYGEN_INVOKED
104         : public cds::algo::flat_combining::container
105 #endif
106     {
107     public:
108         typedef T           value_type;     ///< Value type
109         typedef Queue       queue_type;     ///< Sequential queue class
110         typedef Traits      traits;         ///< Queue type traits
111
112         typedef typename traits::stat  stat;   ///< Internal statistics type
113         static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
114
115     protected:
116         //@cond
117         /// Queue operation IDs
118         enum fc_operation {
119             op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
120             op_enq_move,   ///< Enqueue (move semantics)
121             op_deq,         ///< Dequeue
122             op_clear        ///< Clear
123         };
124
125         /// Flat combining publication list record
126         struct fc_record: public cds::algo::flat_combining::publication_record
127         {
128             union {
129                 value_type const *  pValEnq;  ///< Value to enqueue
130                 value_type *        pValDeq;  ///< Dequeue destination
131             };
132             bool            bEmpty; ///< \p true if the queue is empty
133         };
134         //@endcond
135
136         /// Flat combining kernel
137         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
138
139     protected:
140         //@cond
141         fc_kernel   m_FlatCombining;
142         queue_type  m_Queue;
143         //@endcond
144
145     public:
146         /// Initializes empty queue object
147         FCQueue()
148         {}
149
150         /// Initializes empty queue object and gives flat combining parameters
151         FCQueue(
152             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
153             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
154             )
155             : m_FlatCombining( nCompactFactor, nCombinePassCount )
156         {}
157
158         /// Inserts a new element at the end of the queue
159         /**
160             The content of the new element initialized to a copy of \p val.
161
162             The function always returns \p true
163         */
164         bool enqueue( value_type const& val )
165         {
166             fc_record * pRec = m_FlatCombining.acquire_record();
167             pRec->pValEnq = &val;
168
169             if ( c_bEliminationEnabled )
170                 m_FlatCombining.batch_combine( op_enq, pRec, *this );
171             else
172                 m_FlatCombining.combine( op_enq, pRec, *this );
173
174             assert( pRec->is_done() );
175             m_FlatCombining.release_record( pRec );
176             m_FlatCombining.internal_statistics().onEnqueue();
177             return true;
178         }
179
180         /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
181         bool push( value_type const& val )
182         {
183             return enqueue( val );
184         }
185
186         /// Inserts a new element at the end of the queue (move semantics)
187         /**
188             \p val is moved to inserted element
189         */
190         bool enqueue( value_type&& val )
191         {
192             fc_record * pRec = m_FlatCombining.acquire_record();
193             pRec->pValEnq = &val;
194
195             if ( c_bEliminationEnabled )
196                 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
197             else
198                 m_FlatCombining.combine( op_enq_move, pRec, *this );
199
200             assert( pRec->is_done() );
201             m_FlatCombining.release_record( pRec );
202
203             m_FlatCombining.internal_statistics().onEnqMove();
204             return true;
205         }
206
207         /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
208         bool push( value_type&& val )
209         {
210             return enqueue( val );
211         }
212
213         /// Removes the next element from the queue
214         /**
215             \p val takes a copy of the element
216         */
217         bool dequeue( value_type& val )
218         {
219             fc_record * pRec = m_FlatCombining.acquire_record();
220             pRec->pValDeq = &val;
221
222             if ( c_bEliminationEnabled )
223                 m_FlatCombining.batch_combine( op_deq, pRec, *this );
224             else
225                 m_FlatCombining.combine( op_deq, pRec, *this );
226
227             assert( pRec->is_done() );
228             m_FlatCombining.release_record( pRec );
229
230             m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
231             return !pRec->bEmpty;
232         }
233
234         /// Removes the next element from the queue (a synonym for \ref dequeue)
235         bool pop( value_type& val )
236         {
237             return dequeue( val );
238         }
239
240         /// Clears the queue
241         void clear()
242         {
243             fc_record * pRec = m_FlatCombining.acquire_record();
244
245             if ( c_bEliminationEnabled )
246                 m_FlatCombining.batch_combine( op_clear, pRec, *this );
247             else
248                 m_FlatCombining.combine( op_clear, pRec, *this );
249
250             assert( pRec->is_done() );
251             m_FlatCombining.release_record( pRec );
252         }
253
254         /// Returns the number of elements in the queue.
255         /**
256             Note that <tt>size() == 0</tt> is not mean that the queue is empty because
257             combining record can be in process.
258             To check emptiness use \ref empty function.
259         */
260         size_t size() const
261         {
262             return m_Queue.size();
263         }
264
265         /// Checks if the queue is empty
266         /**
267             If the combining is in process the function waits while combining done.
268         */
269         bool empty() const
270         {
271             m_FlatCombining.wait_while_combining();
272             return m_Queue.empty();
273         }
274
275         /// Internal statistics
276         stat const& statistics() const
277         {
278             return m_FlatCombining.statistics();
279         }
280
281     public: // flat combining cooperation, not for direct use!
282         //@cond
283         /// Flat combining supporting function. Do not call it directly!
284         /**
285             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
286             object if the current thread becomes a combiner. Invocation of the function means that
287             the queue should perform an action recorded in \p pRec.
288         */
289         void fc_apply( fc_record * pRec )
290         {
291             assert( pRec );
292
293             switch ( pRec->op() ) {
294             case op_enq:
295                 assert( pRec->pValEnq );
296                 m_Queue.push( *(pRec->pValEnq ) );
297                 break;
298             case op_enq_move:
299                 assert( pRec->pValEnq );
300                 m_Queue.push( std::move( *(pRec->pValEnq )) );
301                 break;
302             case op_deq:
303                 assert( pRec->pValDeq );
304                 pRec->bEmpty = m_Queue.empty();
305                 if ( !pRec->bEmpty ) {
306                     *(pRec->pValDeq) = m_Queue.front();
307                     m_Queue.pop();
308                 }
309                 break;
310             case op_clear:
311                 while ( !m_Queue.empty() )
312                     m_Queue.pop();
313                 break;
314             default:
315                 assert(false);
316                 break;
317             }
318         }
319
320         /// Batch-processing flat combining
321         void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
322         {
323             typedef typename fc_kernel::iterator fc_iterator;
324             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
325                 switch ( it->op() ) {
326                 case op_enq:
327                 case op_enq_move:
328                 case op_deq:
329                     if ( m_Queue.empty() ) {
330                         if ( itPrev != itEnd && collide( *itPrev, *it ))
331                             itPrev = itEnd;
332                         else
333                             itPrev = it;
334                     }
335                     break;
336                 }
337             }
338         }
339         //@endcond
340
341     private:
342         //@cond
343         bool collide( fc_record& rec1, fc_record& rec2 )
344         {
345             switch ( rec1.op() ) {
346                 case op_enq:
347                     if ( rec2.op() == op_deq ) {
348                         assert(rec1.pValEnq);
349                         assert(rec2.pValDeq);
350                         *rec2.pValDeq = *rec1.pValEnq;
351                         rec2.bEmpty = false;
352                         goto collided;
353                     }
354                     break;
355                 case op_enq_move:
356                     if ( rec2.op() == op_deq ) {
357                         assert(rec1.pValEnq);
358                         assert(rec2.pValDeq);
359                         *rec2.pValDeq = std::move( *rec1.pValEnq );
360                         rec2.bEmpty = false;
361                         goto collided;
362                     }
363                     break;
364                 case op_deq:
365                     switch ( rec2.op() ) {
366                     case op_enq:
367                     case op_enq_move:
368                         return collide( rec2, rec1 );
369                     }
370             }
371             return false;
372
373         collided:
374             m_FlatCombining.operation_done( rec1 );
375             m_FlatCombining.operation_done( rec2 );
376             m_FlatCombining.internal_statistics().onCollide();
377             return true;
378         }
379         //@endcond
380
381     };
382 }} // namespace cds::container
383
384 #endif // #ifndef __CDS_CONTAINER_FCQUEUE_H