Move libcds 1.6.0 from SVN
[libcds.git] / cds / container / fcqueue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_CONTAINER_FCQUEUE_H
4 #define __CDS_CONTAINER_FCQUEUE_H
5
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
8 #include <queue>
9
10 namespace cds { namespace container {
11
12     /// FCQueue related definitions
13     /** @ingroup cds_nonintrusive_helper
14     */
15     namespace fcqueue {
16
17         /// FCQueue internal statistics
18         template <typename Counter = cds::atomicity::event_counter >
19         struct stat: public cds::algo::flat_combining::stat<Counter>
20         {
21             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
22             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
23
24             counter_type    m_nEnqueue     ;   ///< Count of enqueue operations
25             counter_type    m_nEnqMove     ;   ///< Count of enqueue operations with move semantics
26             counter_type    m_nDequeue     ;   ///< Count of success dequeue operations
27             counter_type    m_nFailedDeq   ;   ///< Count of failed dequeue operations (pop from empty queue)
28             counter_type    m_nCollided    ;   ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
29
30             //@cond
31             void    onEnqueue()               { ++m_nEnqueue; }
32             void    onEnqMove()               { ++m_nEnqMove; }
33             void    onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue;  }
34             void    onCollide()               { ++m_nCollided; }
35             //@endcond
36         };
37
38         /// FCQueue dummy statistics, no overhead
39         struct empty_stat: public cds::algo::flat_combining::empty_stat
40         {
41             //@cond
42             void    onEnqueue()     {}
43             void    onEnqMove()     {}
44             void    onDequeue(bool) {}
45             void    onCollide()     {}
46             //@endcond
47         };
48
49         /// FCQueue type traits
50         struct type_traits: public cds::algo::flat_combining::type_traits
51         {
52             typedef empty_stat      stat;   ///< Internal statistics
53             static CDS_CONSTEXPR_CONST bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
54         };
55
56         /// Metafunction converting option list to traits
57         /**
58             This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
59             \p Options are:
60             - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
61             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default
62             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
63             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
64             - \p opt::memory_model - C++ memory ordering model.
65                 List of all available memory ordering see opt::memory_model.
66                 Default if cds::opt::v:relaxed_ordering
67             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
68                 By default, the elimination is disabled. For queue, the elimination is possible if the queue
69                 is empty.
70         */
71         template <CDS_DECL_OPTIONS8>
72         struct make_traits {
73 #   ifdef CDS_DOXYGEN_INVOKED
74             typedef implementation_defined type ;   ///< Metafunction result
75 #   else
76             typedef typename cds::opt::make_options<
77                 typename cds::opt::find_type_traits< type_traits, CDS_OPTIONS8 >::type
78                 ,CDS_OPTIONS8
79             >::type   type;
80 #   endif
81         };
82
83     } // namespace fcqueue
84
85     /// Flat-combining queue
86     /**
87         @ingroup cds_nonintrusive_queue
88         @ingroup cds_flat_combining_container
89
90         \ref cds_flat_combining_description "Flat combining" sequential queue.
91         The class can be considered as a concurrent FC-based wrapper for \p std::queue.
92
93         Template parameters:
94         - \p T - a value type stored in the queue
95         - \p Queue - sequential queue implementation, default is \p std::queue<T>
96         - \p Trats - type traits of flat combining, default is \p fcqueue::type_traits.
97             \p fcqueue::make_traits metafunction can be used to construct specialized \p %type_traits
98     */
99     template <typename T,
100         class Queue = std::queue<T>,
101         typename Traits = fcqueue::type_traits
102     >
103     class FCQueue
104 #ifndef CDS_DOXYGEN_INVOKED
105         : public cds::algo::flat_combining::container
106 #endif
107     {
108     public:
109         typedef T           value_type;     ///< Value type
110         typedef Queue       queue_type;     ///< Sequential queue class
111         typedef Traits      type_traits;    ///< Queue type traits
112
113         typedef typename type_traits::stat  stat;   ///< Internal statistics type
114         static CDS_CONSTEXPR_CONST bool c_bEliminationEnabled = type_traits::enable_elimination; ///< \p true if elimination is enabled
115
116     protected:
117         //@cond
118         /// Queue operation IDs
119         enum fc_operation {
120             op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
121             op_enq_move,   ///< Enqueue (move semantics)
122             op_deq,         ///< Dequeue
123             op_clear        ///< Clear
124         };
125
126         /// Flat combining publication list record
127         struct fc_record: public cds::algo::flat_combining::publication_record
128         {
129             union {
130                 value_type const *  pValEnq;  ///< Value to enqueue
131                 value_type *        pValDeq;  ///< Dequeue destination
132             };
133             bool            bEmpty; ///< \p true if the queue is empty
134         };
135         //@endcond
136
137         /// Flat combining kernel
138         typedef cds::algo::flat_combining::kernel< fc_record, type_traits > fc_kernel;
139
140     protected:
141         //@cond
142         fc_kernel   m_FlatCombining;
143         queue_type  m_Queue;
144         //@endcond
145
146     public:
147         /// Initializes empty queue object
148         FCQueue()
149         {}
150
151         /// Initializes empty queue object and gives flat combining parameters
152         FCQueue(
153             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
154             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
155             )
156             : m_FlatCombining( nCompactFactor, nCombinePassCount )
157         {}
158
159         /// Inserts a new element at the end of the queue
160         /**
161             The content of the new element initialized to a copy of \p val.
162
163             The function always returns \p true
164         */
165         bool enqueue( value_type const& val )
166         {
167             fc_record * pRec = m_FlatCombining.acquire_record();
168             pRec->pValEnq = &val;
169
170             if ( c_bEliminationEnabled )
171                 m_FlatCombining.batch_combine( op_enq, pRec, *this );
172             else
173                 m_FlatCombining.combine( op_enq, pRec, *this );
174
175             assert( pRec->is_done() );
176             m_FlatCombining.release_record( pRec );
177             m_FlatCombining.internal_statistics().onEnqueue();
178             return true;
179         }
180
181         /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
182         bool push( value_type const& val )
183         {
184             return enqueue( val );
185         }
186
187 #   ifdef CDS_MOVE_SEMANTICS_SUPPORT
188         /// Inserts a new element at the end of the queue (move semantics)
189         /**
190             \p val is moved to inserted element
191         */
192         bool enqueue( value_type&& val )
193         {
194             fc_record * pRec = m_FlatCombining.acquire_record();
195             pRec->pValEnq = &val;
196
197             if ( c_bEliminationEnabled )
198                 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
199             else
200                 m_FlatCombining.combine( op_enq_move, pRec, *this );
201
202             assert( pRec->is_done() );
203             m_FlatCombining.release_record( pRec );
204
205             m_FlatCombining.internal_statistics().onEnqMove();
206             return true;
207         }
208
209         /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
210         bool push( value_type&& val )
211         {
212             return enqueue( val );
213         }
214 #   endif
215
216         /// Removes the next element from the queue
217         /**
218             \p val takes a copy of the element
219         */
220         bool dequeue( value_type& val )
221         {
222             fc_record * pRec = m_FlatCombining.acquire_record();
223             pRec->pValDeq = &val;
224
225             if ( c_bEliminationEnabled )
226                 m_FlatCombining.batch_combine( op_deq, pRec, *this );
227             else
228                 m_FlatCombining.combine( op_deq, pRec, *this );
229
230             assert( pRec->is_done() );
231             m_FlatCombining.release_record( pRec );
232
233             m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
234             return !pRec->bEmpty;
235         }
236
237         /// Removes the next element from the queue (a synonym for \ref dequeue)
238         bool pop( value_type& val )
239         {
240             return dequeue( val );
241         }
242
243         /// Clears the queue
244         void clear()
245         {
246             fc_record * pRec = m_FlatCombining.acquire_record();
247
248             if ( c_bEliminationEnabled )
249                 m_FlatCombining.batch_combine( op_clear, pRec, *this );
250             else
251                 m_FlatCombining.combine( op_clear, pRec, *this );
252
253             assert( pRec->is_done() );
254             m_FlatCombining.release_record( pRec );
255         }
256
257         /// Returns the number of elements in the queue.
258         /**
259             Note that <tt>size() == 0</tt> is not mean that the queue is empty because
260             combining record can be in process.
261             To check emptiness use \ref empty function.
262         */
263         size_t size() const
264         {
265             return m_Queue.size();
266         }
267
268         /// Checks if the queue is empty
269         /**
270             If the combining is in process the function waits while combining done.
271         */
272         bool empty() const
273         {
274             m_FlatCombining.wait_while_combining();
275             return m_Queue.empty();
276         }
277
278         /// Internal statistics
279         stat const& statistics() const
280         {
281             return m_FlatCombining.statistics();
282         }
283
284     public: // flat combining cooperation, not for direct use!
285         //@cond
286         /// Flat combining supporting function. Do not call it directly!
287         /**
288             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
289             object if the current thread becomes a combiner. Invocation of the function means that
290             the queue should perform an action recorded in \p pRec.
291         */
292         void fc_apply( fc_record * pRec )
293         {
294             assert( pRec );
295
296             switch ( pRec->op() ) {
297             case op_enq:
298                 assert( pRec->pValEnq );
299                 m_Queue.push( *(pRec->pValEnq ) );
300                 break;
301 #       ifdef CDS_MOVE_SEMANTICS_SUPPORT
302             case op_enq_move:
303                 assert( pRec->pValEnq );
304                 m_Queue.push( std::move( *(pRec->pValEnq )) );
305                 break;
306 #       endif
307             case op_deq:
308                 assert( pRec->pValDeq );
309                 pRec->bEmpty = m_Queue.empty();
310                 if ( !pRec->bEmpty ) {
311                     *(pRec->pValDeq) = m_Queue.front();
312                     m_Queue.pop();
313                 }
314                 break;
315             case op_clear:
316                 while ( !m_Queue.empty() )
317                     m_Queue.pop();
318                 break;
319             default:
320                 assert(false);
321                 break;
322             }
323         }
324
325         /// Batch-processing flat combining
326         void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
327         {
328             typedef typename fc_kernel::iterator fc_iterator;
329             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
330                 switch ( it->op() ) {
331                 case op_enq:
332                 case op_enq_move:
333                 case op_deq:
334                     if ( m_Queue.empty() ) {
335                         if ( itPrev != itEnd && collide( *itPrev, *it ))
336                             itPrev = itEnd;
337                         else
338                             itPrev = it;
339                     }
340                     break;
341                 }
342             }
343         }
344         //@endcond
345
346     private:
347         //@cond
348         bool collide( fc_record& rec1, fc_record& rec2 )
349         {
350             switch ( rec1.op() ) {
351                 case op_enq:
352                     if ( rec2.op() == op_deq ) {
353                         assert(rec1.pValEnq);
354                         assert(rec2.pValDeq);
355                         *rec2.pValDeq = *rec1.pValEnq;
356                         rec2.bEmpty = false;
357                         goto collided;
358                     }
359                     break;
360 #       ifdef CDS_MOVE_SEMANTICS_SUPPORT
361                 case op_enq_move:
362                     if ( rec2.op() == op_deq ) {
363                         assert(rec1.pValEnq);
364                         assert(rec2.pValDeq);
365                         *rec2.pValDeq = std::move( *rec1.pValEnq );
366                         rec2.bEmpty = false;
367                         goto collided;
368                     }
369                     break;
370 #       endif
371                 case op_deq:
372                     switch ( rec2.op() ) {
373                     case op_enq:
374 #       ifdef CDS_MOVE_SEMANTICS_SUPPORT
375                     case op_enq_move:
376 #       endif
377                         return collide( rec2, rec1 );
378                     }
379             }
380             return false;
381
382         collided:
383             m_FlatCombining.operation_done( rec1 );
384             m_FlatCombining.operation_done( rec2 );
385             m_FlatCombining.internal_statistics().onCollide();
386             return true;
387         }
388         //@endcond
389
390     };
391 }} // namespace cds::container
392
393 #endif // #ifndef __CDS_CONTAINER_FCQUEUE_H