Merge branch 'dev'
[libcds.git] / cds / container / fcqueue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_CONTAINER_FCQUEUE_H
4 #define CDSLIB_CONTAINER_FCQUEUE_H
5
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
8 #include <queue>
9
10 namespace cds { namespace container {
11
12     /// FCQueue related definitions
13     /** @ingroup cds_nonintrusive_helper
14     */
15     namespace fcqueue {
16
17         /// FCQueue internal statistics
18         template <typename Counter = cds::atomicity::event_counter >
19         struct stat: public cds::algo::flat_combining::stat<Counter>
20         {
21             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
22             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
23
24             counter_type    m_nEnqueue     ;   ///< Count of enqueue operations
25             counter_type    m_nEnqMove     ;   ///< Count of enqueue operations with move semantics
26             counter_type    m_nDequeue     ;   ///< Count of success dequeue operations
27             counter_type    m_nFailedDeq   ;   ///< Count of failed dequeue operations (pop from empty queue)
28             counter_type    m_nCollided    ;   ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
29
30             //@cond
31             void    onEnqueue()               { ++m_nEnqueue; }
32             void    onEnqMove()               { ++m_nEnqMove; }
33             void    onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue;  }
34             void    onCollide()               { ++m_nCollided; }
35             //@endcond
36         };
37
38         /// FCQueue dummy statistics, no overhead
39         struct empty_stat: public cds::algo::flat_combining::empty_stat
40         {
41             //@cond
42             void    onEnqueue()     {}
43             void    onEnqMove()     {}
44             void    onDequeue(bool) {}
45             void    onCollide()     {}
46             //@endcond
47         };
48
49         /// FCQueue type traits
50         struct traits: public cds::algo::flat_combining::traits
51         {
52             typedef empty_stat      stat;   ///< Internal statistics
53             static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
54         };
55
56         /// Metafunction converting option list to traits
57         /**
58             \p Options are:
59             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
60             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
61             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
62             - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
63             - \p opt::memory_model - C++ memory ordering model.
64                 List of all available memory ordering see \p opt::memory_model.
65                 Default is \p cds::opt::v:relaxed_ordering
66             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
67                 By default, the elimination is disabled. For queue, the elimination is possible if the queue
68                 is empty.
69         */
70         template <typename... Options>
71         struct make_traits {
72 #   ifdef CDS_DOXYGEN_INVOKED
73             typedef implementation_defined type ;   ///< Metafunction result
74 #   else
75             typedef typename cds::opt::make_options<
76                 typename cds::opt::find_type_traits< traits, Options... >::type
77                 ,Options...
78             >::type   type;
79 #   endif
80         };
81
82     } // namespace fcqueue
83
84     /// Flat-combining queue
85     /**
86         @ingroup cds_nonintrusive_queue
87         @ingroup cds_flat_combining_container
88
89         \ref cds_flat_combining_description "Flat combining" sequential queue.
90         The class can be considered as a concurrent FC-based wrapper for \p std::queue.
91
92         Template parameters:
93         - \p T - a value type stored in the queue
94         - \p Queue - sequential queue implementation, default is \p std::queue<T>
95         - \p Trats - type traits of flat combining, default is \p fcqueue::traits.
96             \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization.
97     */
98     template <typename T,
99         class Queue = std::queue<T>,
100         typename Traits = fcqueue::traits
101     >
102     class FCQueue
103 #ifndef CDS_DOXYGEN_INVOKED
104         : public cds::algo::flat_combining::container
105 #endif
106     {
107     public:
108         typedef T           value_type;     ///< Value type
109         typedef Queue       queue_type;     ///< Sequential queue class
110         typedef Traits      traits;         ///< Queue type traits
111
112         typedef typename traits::stat  stat;   ///< Internal statistics type
113         static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
114
115     protected:
116         //@cond
117         /// Queue operation IDs
118         enum fc_operation {
119             op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
120             op_enq_move,    ///< Enqueue (move semantics)
121             op_deq,         ///< Dequeue
122             op_clear,       ///< Clear
123             op_empty        ///< Empty
124         };
125
126         /// Flat combining publication list record
127         struct fc_record: public cds::algo::flat_combining::publication_record
128         {
129             union {
130                 value_type const *  pValEnq;  ///< Value to enqueue
131                 value_type *        pValDeq;  ///< Dequeue destination
132             };
133             bool            bEmpty; ///< \p true if the queue is empty
134         };
135         //@endcond
136
137         /// Flat combining kernel
138         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
139
140     protected:
141         //@cond
142         fc_kernel   m_FlatCombining;
143         queue_type  m_Queue;
144         //@endcond
145
146     public:
147         /// Initializes empty queue object
148         FCQueue()
149         {}
150
151         /// Initializes empty queue object and gives flat combining parameters
152         FCQueue(
153             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
154             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
155             )
156             : m_FlatCombining( nCompactFactor, nCombinePassCount )
157         {}
158
159         /// Inserts a new element at the end of the queue
160         /**
161             The content of the new element initialized to a copy of \p val.
162
163             The function always returns \p true
164         */
165         bool enqueue( value_type const& val )
166         {
167             fc_record * pRec = m_FlatCombining.acquire_record();
168             pRec->pValEnq = &val;
169
170             if ( c_bEliminationEnabled )
171                 m_FlatCombining.batch_combine( op_enq, pRec, *this );
172             else
173                 m_FlatCombining.combine( op_enq, pRec, *this );
174
175             assert( pRec->is_done() );
176             m_FlatCombining.release_record( pRec );
177             m_FlatCombining.internal_statistics().onEnqueue();
178             return true;
179         }
180
181         /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
182         bool push( value_type const& val )
183         {
184             return enqueue( val );
185         }
186
187         /// Inserts a new element at the end of the queue (move semantics)
188         /**
189             \p val is moved to inserted element
190         */
191         bool enqueue( value_type&& val )
192         {
193             fc_record * pRec = m_FlatCombining.acquire_record();
194             pRec->pValEnq = &val;
195
196             if ( c_bEliminationEnabled )
197                 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
198             else
199                 m_FlatCombining.combine( op_enq_move, pRec, *this );
200
201             assert( pRec->is_done() );
202             m_FlatCombining.release_record( pRec );
203
204             m_FlatCombining.internal_statistics().onEnqMove();
205             return true;
206         }
207
208         /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
209         bool push( value_type&& val )
210         {
211             return enqueue( val );
212         }
213
214         /// Removes the next element from the queue
215         /**
216             \p val takes a copy of the element
217         */
218         bool dequeue( value_type& val )
219         {
220             fc_record * pRec = m_FlatCombining.acquire_record();
221             pRec->pValDeq = &val;
222
223             if ( c_bEliminationEnabled )
224                 m_FlatCombining.batch_combine( op_deq, pRec, *this );
225             else
226                 m_FlatCombining.combine( op_deq, pRec, *this );
227
228             assert( pRec->is_done() );
229             m_FlatCombining.release_record( pRec );
230
231             m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
232             return !pRec->bEmpty;
233         }
234
235         /// Removes the next element from the queue (a synonym for \ref dequeue)
236         bool pop( value_type& val )
237         {
238             return dequeue( val );
239         }
240
241         /// Clears the queue
242         void clear()
243         {
244             fc_record * pRec = m_FlatCombining.acquire_record();
245
246             if ( c_bEliminationEnabled )
247                 m_FlatCombining.batch_combine( op_clear, pRec, *this );
248             else
249                 m_FlatCombining.combine( op_clear, pRec, *this );
250
251             assert( pRec->is_done() );
252             m_FlatCombining.release_record( pRec );
253         }
254
255         /// Returns the number of elements in the queue.
256         /**
257             Note that <tt>size() == 0</tt> is not mean that the queue is empty because
258             combining record can be in process.
259             To check emptiness use \ref empty function.
260         */
261         size_t size() const
262         {
263             return m_Queue.size();
264         }
265
266         /// Checks if the queue is empty
267         /**
268             If the combining is in process the function waits while combining done.
269         */
270         bool empty()
271         {
272             fc_record * pRec = m_FlatCombining.acquire_record();
273
274             if ( c_bEliminationEnabled )
275                 m_FlatCombining.batch_combine( op_empty, pRec, *this );
276             else
277                 m_FlatCombining.combine( op_empty, pRec, *this );
278
279             assert( pRec->is_done() );
280             m_FlatCombining.release_record( pRec );
281             return pRec->bEmpty;
282         }
283
284         /// Internal statistics
285         stat const& statistics() const
286         {
287             return m_FlatCombining.statistics();
288         }
289
290     public: // flat combining cooperation, not for direct use!
291         //@cond
292         /// Flat combining supporting function. Do not call it directly!
293         /**
294             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
295             object if the current thread becomes a combiner. Invocation of the function means that
296             the queue should perform an action recorded in \p pRec.
297         */
298         void fc_apply( fc_record * pRec )
299         {
300             assert( pRec );
301
302             // this function is called under FC mutex, so switch TSan off
303             CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
304
305             switch ( pRec->op() ) {
306             case op_enq:
307                 assert( pRec->pValEnq );
308                 m_Queue.push( *(pRec->pValEnq ) );
309                 break;
310             case op_enq_move:
311                 assert( pRec->pValEnq );
312                 m_Queue.push( std::move( *(pRec->pValEnq )) );
313                 break;
314             case op_deq:
315                 assert( pRec->pValDeq );
316                 pRec->bEmpty = m_Queue.empty();
317                 if ( !pRec->bEmpty ) {
318                     *(pRec->pValDeq) = m_Queue.front();
319                     m_Queue.pop();
320                 }
321                 break;
322             case op_clear:
323                 while ( !m_Queue.empty() )
324                     m_Queue.pop();
325                 break;
326             case op_empty:
327                 pRec->bEmpty = m_Queue.empty();
328                 break;
329             default:
330                 assert(false);
331                 break;
332             }
333             CDS_TSAN_ANNOTATE_IGNORE_RW_END;
334         }
335
336         /// Batch-processing flat combining
337         void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
338         {
339             typedef typename fc_kernel::iterator fc_iterator;
340
341             // this function is called under FC mutex, so switch TSan off
342             CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
343
344             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
345                 switch ( it->op() ) {
346                 case op_enq:
347                 case op_enq_move:
348                 case op_deq:
349                     if ( m_Queue.empty() ) {
350                         if ( itPrev != itEnd && collide( *itPrev, *it ))
351                             itPrev = itEnd;
352                         else
353                             itPrev = it;
354                     }
355                     break;
356                 }
357             }
358             CDS_TSAN_ANNOTATE_IGNORE_RW_END;
359         }
360         //@endcond
361
362     private:
363         //@cond
364         bool collide( fc_record& rec1, fc_record& rec2 )
365         {
366             switch ( rec1.op() ) {
367                 case op_enq:
368                     if ( rec2.op() == op_deq ) {
369                         assert(rec1.pValEnq);
370                         assert(rec2.pValDeq);
371                         *rec2.pValDeq = *rec1.pValEnq;
372                         rec2.bEmpty = false;
373                         goto collided;
374                     }
375                     break;
376                 case op_enq_move:
377                     if ( rec2.op() == op_deq ) {
378                         assert(rec1.pValEnq);
379                         assert(rec2.pValDeq);
380                         *rec2.pValDeq = std::move( *rec1.pValEnq );
381                         rec2.bEmpty = false;
382                         goto collided;
383                     }
384                     break;
385                 case op_deq:
386                     switch ( rec2.op() ) {
387                     case op_enq:
388                     case op_enq_move:
389                         return collide( rec2, rec1 );
390                     }
391             }
392             return false;
393
394         collided:
395             m_FlatCombining.operation_done( rec1 );
396             m_FlatCombining.operation_done( rec2 );
397             m_FlatCombining.internal_statistics().onCollide();
398             return true;
399         }
400         //@endcond
401
402     };
403 }} // namespace cds::container
404
405 #endif // #ifndef CDSLIB_CONTAINER_FCQUEUE_H