dff95edea937173e8b044efd083dffcfdbf567ad
[libcds.git] / cds / container / fcqueue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #ifndef CDSLIB_CONTAINER_FCQUEUE_H
32 #define CDSLIB_CONTAINER_FCQUEUE_H
33
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
36 #include <queue>
37
38 namespace cds { namespace container {
39
40     /// FCQueue related definitions
41     /** @ingroup cds_nonintrusive_helper
42     */
43     namespace fcqueue {
44
45         /// FCQueue internal statistics
46         template <typename Counter = cds::atomicity::event_counter >
47         struct stat: public cds::algo::flat_combining::stat<Counter>
48         {
49             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
50             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
51
52             counter_type    m_nEnqueue     ;   ///< Count of enqueue operations
53             counter_type    m_nEnqMove     ;   ///< Count of enqueue operations with move semantics
54             counter_type    m_nDequeue     ;   ///< Count of success dequeue operations
55             counter_type    m_nFailedDeq   ;   ///< Count of failed dequeue operations (pop from empty queue)
56             counter_type    m_nCollided    ;   ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
57
58             //@cond
59             void    onEnqueue()               { ++m_nEnqueue; }
60             void    onEnqMove()               { ++m_nEnqMove; }
61             void    onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue;  }
62             void    onCollide()               { ++m_nCollided; }
63             //@endcond
64         };
65
66         /// FCQueue dummy statistics, no overhead
67         struct empty_stat: public cds::algo::flat_combining::empty_stat
68         {
69             //@cond
70             void    onEnqueue()     {}
71             void    onEnqMove()     {}
72             void    onDequeue(bool) {}
73             void    onCollide()     {}
74             //@endcond
75         };
76
77         /// FCQueue type traits
78         struct traits: public cds::algo::flat_combining::traits
79         {
80             typedef empty_stat      stat;   ///< Internal statistics
81             static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
82         };
83
84         /// Metafunction converting option list to traits
85         /**
86             \p Options are:
87             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
88             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
89             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
90             - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
91             - \p opt::memory_model - C++ memory ordering model.
92                 List of all available memory ordering see \p opt::memory_model.
93                 Default is \p cds::opt::v:relaxed_ordering
94             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
95                 By default, the elimination is disabled. For queue, the elimination is possible if the queue
96                 is empty.
97         */
98         template <typename... Options>
99         struct make_traits {
100 #   ifdef CDS_DOXYGEN_INVOKED
101             typedef implementation_defined type ;   ///< Metafunction result
102 #   else
103             typedef typename cds::opt::make_options<
104                 typename cds::opt::find_type_traits< traits, Options... >::type
105                 ,Options...
106             >::type   type;
107 #   endif
108         };
109
110     } // namespace fcqueue
111
112     /// Flat-combining queue
113     /**
114         @ingroup cds_nonintrusive_queue
115         @ingroup cds_flat_combining_container
116
117         \ref cds_flat_combining_description "Flat combining" sequential queue.
118         The class can be considered as a concurrent FC-based wrapper for \p std::queue.
119
120         Template parameters:
121         - \p T - a value type stored in the queue
122         - \p Queue - sequential queue implementation, default is \p std::queue<T>
123         - \p Trats - type traits of flat combining, default is \p fcqueue::traits.
124             \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization.
125     */
126     template <typename T,
127         class Queue = std::queue<T>,
128         typename Traits = fcqueue::traits
129     >
130     class FCQueue
131 #ifndef CDS_DOXYGEN_INVOKED
132         : public cds::algo::flat_combining::container
133 #endif
134     {
135     public:
136         typedef T           value_type;     ///< Value type
137         typedef Queue       queue_type;     ///< Sequential queue class
138         typedef Traits      traits;         ///< Queue type traits
139
140         typedef typename traits::stat  stat;   ///< Internal statistics type
141         static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
142
143     protected:
144         //@cond
145         /// Queue operation IDs
146         enum fc_operation {
147             op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
148             op_enq_move,    ///< Enqueue (move semantics)
149             op_deq,         ///< Dequeue
150             op_clear,       ///< Clear
151             op_empty        ///< Empty
152         };
153
154         /// Flat combining publication list record
155         struct fc_record: public cds::algo::flat_combining::publication_record
156         {
157             union {
158                 value_type const *  pValEnq;  ///< Value to enqueue
159                 value_type *        pValDeq;  ///< Dequeue destination
160             };
161             bool            bEmpty; ///< \p true if the queue is empty
162         };
163         //@endcond
164
165         /// Flat combining kernel
166         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
167
168     protected:
169         //@cond
170         fc_kernel   m_FlatCombining;
171         queue_type  m_Queue;
172         //@endcond
173
174     public:
175         /// Initializes empty queue object
176         FCQueue()
177         {}
178
179         /// Initializes empty queue object and gives flat combining parameters
180         FCQueue(
181             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
182             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
183             )
184             : m_FlatCombining( nCompactFactor, nCombinePassCount )
185         {}
186
187         /// Inserts a new element at the end of the queue
188         /**
189             The content of the new element initialized to a copy of \p val.
190
191             The function always returns \p true
192         */
193         bool enqueue( value_type const& val )
194         {
195             fc_record * pRec = m_FlatCombining.acquire_record();
196             pRec->pValEnq = &val;
197
198             if ( c_bEliminationEnabled )
199                 m_FlatCombining.batch_combine( op_enq, pRec, *this );
200             else
201                 m_FlatCombining.combine( op_enq, pRec, *this );
202
203             assert( pRec->is_done() );
204             m_FlatCombining.release_record( pRec );
205             m_FlatCombining.internal_statistics().onEnqueue();
206             return true;
207         }
208
209         /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
210         bool push( value_type const& val )
211         {
212             return enqueue( val );
213         }
214
215         /// Inserts a new element at the end of the queue (move semantics)
216         /**
217             \p val is moved to inserted element
218         */
219         bool enqueue( value_type&& val )
220         {
221             fc_record * pRec = m_FlatCombining.acquire_record();
222             pRec->pValEnq = &val;
223
224             if ( c_bEliminationEnabled )
225                 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
226             else
227                 m_FlatCombining.combine( op_enq_move, pRec, *this );
228
229             assert( pRec->is_done() );
230             m_FlatCombining.release_record( pRec );
231
232             m_FlatCombining.internal_statistics().onEnqMove();
233             return true;
234         }
235
236         /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
237         bool push( value_type&& val )
238         {
239             return enqueue( val );
240         }
241
242         /// Removes the next element from the queue
243         /**
244             \p val takes a copy of the element
245         */
246         bool dequeue( value_type& val )
247         {
248             fc_record * pRec = m_FlatCombining.acquire_record();
249             pRec->pValDeq = &val;
250
251             if ( c_bEliminationEnabled )
252                 m_FlatCombining.batch_combine( op_deq, pRec, *this );
253             else
254                 m_FlatCombining.combine( op_deq, pRec, *this );
255
256             assert( pRec->is_done() );
257             m_FlatCombining.release_record( pRec );
258
259             m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
260             return !pRec->bEmpty;
261         }
262
263         /// Removes the next element from the queue (a synonym for \ref dequeue)
264         bool pop( value_type& val )
265         {
266             return dequeue( val );
267         }
268
269         /// Clears the queue
270         void clear()
271         {
272             fc_record * pRec = m_FlatCombining.acquire_record();
273
274             if ( c_bEliminationEnabled )
275                 m_FlatCombining.batch_combine( op_clear, pRec, *this );
276             else
277                 m_FlatCombining.combine( op_clear, pRec, *this );
278
279             assert( pRec->is_done() );
280             m_FlatCombining.release_record( pRec );
281         }
282
283         /// Returns the number of elements in the queue.
284         /**
285             Note that <tt>size() == 0</tt> is not mean that the queue is empty because
286             combining record can be in process.
287             To check emptiness use \ref empty function.
288         */
289         size_t size() const
290         {
291             return m_Queue.size();
292         }
293
294         /// Checks if the queue is empty
295         /**
296             If the combining is in process the function waits while combining done.
297         */
298         bool empty()
299         {
300             fc_record * pRec = m_FlatCombining.acquire_record();
301
302             if ( c_bEliminationEnabled )
303                 m_FlatCombining.batch_combine( op_empty, pRec, *this );
304             else
305                 m_FlatCombining.combine( op_empty, pRec, *this );
306
307             assert( pRec->is_done() );
308             m_FlatCombining.release_record( pRec );
309             return pRec->bEmpty;
310         }
311
312         /// Internal statistics
313         stat const& statistics() const
314         {
315             return m_FlatCombining.statistics();
316         }
317
318     public: // flat combining cooperation, not for direct use!
319         //@cond
320         /// Flat combining supporting function. Do not call it directly!
321         /**
322             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
323             object if the current thread becomes a combiner. Invocation of the function means that
324             the queue should perform an action recorded in \p pRec.
325         */
326         void fc_apply( fc_record * pRec )
327         {
328             assert( pRec );
329
330             // this function is called under FC mutex, so switch TSan off
331             CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
332
333             switch ( pRec->op() ) {
334             case op_enq:
335                 assert( pRec->pValEnq );
336                 m_Queue.push( *(pRec->pValEnq ) );
337                 break;
338             case op_enq_move:
339                 assert( pRec->pValEnq );
340                 m_Queue.push( std::move( *(pRec->pValEnq )) );
341                 break;
342             case op_deq:
343                 assert( pRec->pValDeq );
344                 pRec->bEmpty = m_Queue.empty();
345                 if ( !pRec->bEmpty ) {
346                     *(pRec->pValDeq) = std::move( m_Queue.front());
347                     m_Queue.pop();
348                 }
349                 break;
350             case op_clear:
351                 while ( !m_Queue.empty() )
352                     m_Queue.pop();
353                 break;
354             case op_empty:
355                 pRec->bEmpty = m_Queue.empty();
356                 break;
357             default:
358                 assert(false);
359                 break;
360             }
361             CDS_TSAN_ANNOTATE_IGNORE_RW_END;
362         }
363
364         /// Batch-processing flat combining
365         void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
366         {
367             typedef typename fc_kernel::iterator fc_iterator;
368
369             // this function is called under FC mutex, so switch TSan off
370             CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
371
372             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
373                 switch ( it->op() ) {
374                 case op_enq:
375                 case op_enq_move:
376                 case op_deq:
377                     if ( m_Queue.empty() ) {
378                         if ( itPrev != itEnd && collide( *itPrev, *it ))
379                             itPrev = itEnd;
380                         else
381                             itPrev = it;
382                     }
383                     break;
384                 }
385             }
386             CDS_TSAN_ANNOTATE_IGNORE_RW_END;
387         }
388         //@endcond
389
390     private:
391         //@cond
392         bool collide( fc_record& rec1, fc_record& rec2 )
393         {
394             switch ( rec1.op() ) {
395                 case op_enq:
396                     if ( rec2.op() == op_deq ) {
397                         assert(rec1.pValEnq);
398                         assert(rec2.pValDeq);
399                         *rec2.pValDeq = *rec1.pValEnq;
400                         rec2.bEmpty = false;
401                         goto collided;
402                     }
403                     break;
404                 case op_enq_move:
405                     if ( rec2.op() == op_deq ) {
406                         assert(rec1.pValEnq);
407                         assert(rec2.pValDeq);
408                         *rec2.pValDeq = std::move( *rec1.pValEnq );
409                         rec2.bEmpty = false;
410                         goto collided;
411                     }
412                     break;
413                 case op_deq:
414                     switch ( rec2.op() ) {
415                     case op_enq:
416                     case op_enq_move:
417                         return collide( rec2, rec1 );
418                     }
419             }
420             return false;
421
422         collided:
423             m_FlatCombining.operation_done( rec1 );
424             m_FlatCombining.operation_done( rec2 );
425             m_FlatCombining.internal_statistics().onCollide();
426             return true;
427         }
428         //@endcond
429
430     };
431 }} // namespace cds::container
432
433 #endif // #ifndef CDSLIB_CONTAINER_FCQUEUE_H