Removed trailing spaces
[libcds.git] / cds / container / fcqueue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_FCQUEUE_H
32 #define CDSLIB_CONTAINER_FCQUEUE_H
33
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
36 #include <queue>
37
38 namespace cds { namespace container {
39
40     /// FCQueue related definitions
41     /** @ingroup cds_nonintrusive_helper
42     */
43     namespace fcqueue {
44
45         /// FCQueue internal statistics
46         template <typename Counter = cds::atomicity::event_counter >
47         struct stat: public cds::algo::flat_combining::stat<Counter>
48         {
49             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
50             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
51
52             counter_type    m_nEnqueue     ;   ///< Count of enqueue operations
53             counter_type    m_nEnqMove     ;   ///< Count of enqueue operations with move semantics
54             counter_type    m_nDequeue     ;   ///< Count of success dequeue operations
55             counter_type    m_nFailedDeq   ;   ///< Count of failed dequeue operations (pop from empty queue)
56             counter_type    m_nCollided    ;   ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
57
58             //@cond
59             void    onEnqueue()               { ++m_nEnqueue; }
60             void    onEnqMove()               { ++m_nEnqMove; }
61             void    onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue;  }
62             void    onCollide()               { ++m_nCollided; }
63             //@endcond
64         };
65
66         /// FCQueue dummy statistics, no overhead
67         struct empty_stat: public cds::algo::flat_combining::empty_stat
68         {
69             //@cond
70             void    onEnqueue()     {}
71             void    onEnqMove()     {}
72             void    onDequeue(bool) {}
73             void    onCollide()     {}
74             //@endcond
75         };
76
77         /// FCQueue type traits
78         struct traits: public cds::algo::flat_combining::traits
79         {
80             typedef empty_stat      stat;   ///< Internal statistics
81             static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
82         };
83
84         /// Metafunction converting option list to traits
85         /**
86             \p Options are:
87             - any \p cds::algo::flat_combining::make_traits options
88             - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
89             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
90                 By default, the elimination is disabled. For queue, the elimination is possible if the queue
91                 is empty.
92         */
93         template <typename... Options>
94         struct make_traits {
95 #   ifdef CDS_DOXYGEN_INVOKED
96             typedef implementation_defined type ;   ///< Metafunction result
97 #   else
98             typedef typename cds::opt::make_options<
99                 typename cds::opt::find_type_traits< traits, Options... >::type
100                 ,Options...
101             >::type   type;
102 #   endif
103         };
104
105     } // namespace fcqueue
106
107     /// Flat-combining queue
108     /**
109         @ingroup cds_nonintrusive_queue
110         @ingroup cds_flat_combining_container
111
112         \ref cds_flat_combining_description "Flat combining" sequential queue.
113         The class can be considered as a concurrent FC-based wrapper for \p std::queue.
114
115         Template parameters:
116         - \p T - a value type stored in the queue
117         - \p Queue - sequential queue implementation, default is \p std::queue<T>
118         - \p Trats - type traits of flat combining, default is \p fcqueue::traits.
119             \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization.
120     */
121     template <typename T,
122         class Queue = std::queue<T>,
123         typename Traits = fcqueue::traits
124     >
125     class FCQueue
126 #ifndef CDS_DOXYGEN_INVOKED
127         : public cds::algo::flat_combining::container
128 #endif
129     {
130     public:
131         typedef T           value_type;     ///< Value type
132         typedef Queue       queue_type;     ///< Sequential queue class
133         typedef Traits      traits;         ///< Queue type traits
134
135         typedef typename traits::stat  stat;   ///< Internal statistics type
136         static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
137
138     protected:
139         //@cond
140         /// Queue operation IDs
141         enum fc_operation {
142             op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
143             op_enq_move,    ///< Enqueue (move semantics)
144             op_deq,         ///< Dequeue
145             op_clear        ///< Clear
146         };
147
148         /// Flat combining publication list record
149         struct fc_record: public cds::algo::flat_combining::publication_record
150         {
151             union {
152                 value_type const *  pValEnq;  ///< Value to enqueue
153                 value_type *        pValDeq;  ///< Dequeue destination
154             };
155             bool            bEmpty; ///< \p true if the queue is empty
156         };
157         //@endcond
158
159         /// Flat combining kernel
160         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
161
162     protected:
163         //@cond
164         mutable fc_kernel m_FlatCombining;
165         queue_type        m_Queue;
166         //@endcond
167
168     public:
169         /// Initializes empty queue object
170         FCQueue()
171         {}
172
173         /// Initializes empty queue object and gives flat combining parameters
174         FCQueue(
175             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
176             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
177             )
178             : m_FlatCombining( nCompactFactor, nCombinePassCount )
179         {}
180
181         /// Inserts a new element at the end of the queue
182         /**
183             The content of the new element initialized to a copy of \p val.
184
185             The function always returns \p true
186         */
187         bool enqueue( value_type const& val )
188         {
189             auto pRec = m_FlatCombining.acquire_record();
190             pRec->pValEnq = &val;
191
192             if ( c_bEliminationEnabled )
193                 m_FlatCombining.batch_combine( op_enq, pRec, *this );
194             else
195                 m_FlatCombining.combine( op_enq, pRec, *this );
196
197             assert( pRec->is_done() );
198             m_FlatCombining.release_record( pRec );
199             m_FlatCombining.internal_statistics().onEnqueue();
200             return true;
201         }
202
203         /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
204         bool push( value_type const& val )
205         {
206             return enqueue( val );
207         }
208
209         /// Inserts a new element at the end of the queue (move semantics)
210         /**
211             \p val is moved to inserted element
212         */
213         bool enqueue( value_type&& val )
214         {
215             auto pRec = m_FlatCombining.acquire_record();
216             pRec->pValEnq = &val;
217
218             if ( c_bEliminationEnabled )
219                 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
220             else
221                 m_FlatCombining.combine( op_enq_move, pRec, *this );
222
223             assert( pRec->is_done() );
224             m_FlatCombining.release_record( pRec );
225
226             m_FlatCombining.internal_statistics().onEnqMove();
227             return true;
228         }
229
230         /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
231         bool push( value_type&& val )
232         {
233             return enqueue( val );
234         }
235
236         /// Removes the next element from the queue
237         /**
238             \p val takes a copy of the element
239         */
240         bool dequeue( value_type& val )
241         {
242             auto pRec = m_FlatCombining.acquire_record();
243             pRec->pValDeq = &val;
244
245             if ( c_bEliminationEnabled )
246                 m_FlatCombining.batch_combine( op_deq, pRec, *this );
247             else
248                 m_FlatCombining.combine( op_deq, pRec, *this );
249
250             assert( pRec->is_done() );
251             m_FlatCombining.release_record( pRec );
252
253             m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
254             return !pRec->bEmpty;
255         }
256
257         /// Removes the next element from the queue (a synonym for \ref dequeue)
258         bool pop( value_type& val )
259         {
260             return dequeue( val );
261         }
262
263         /// Clears the queue
264         void clear()
265         {
266             auto pRec = m_FlatCombining.acquire_record();
267
268             if ( c_bEliminationEnabled )
269                 m_FlatCombining.batch_combine( op_clear, pRec, *this );
270             else
271                 m_FlatCombining.combine( op_clear, pRec, *this );
272
273             assert( pRec->is_done() );
274             m_FlatCombining.release_record( pRec );
275         }
276
277         /// Returns the number of elements in the queue.
278         /**
279             Note that <tt>size() == 0</tt> is not mean that the queue is empty because
280             combining record can be in process.
281             To check emptiness use \ref empty function.
282         */
283         size_t size() const
284         {
285             return m_Queue.size();
286         }
287
288         /// Checks if the queue is empty
289         /**
290             If the combining is in process the function waits while combining done.
291         */
292         bool empty() const
293         {
294             bool bRet = false;
295             auto const& queue = m_Queue;
296             m_FlatCombining.invoke_exclusive( [&queue, &bRet]() { bRet = queue.empty(); } );
297             return bRet;
298         }
299
300         /// Internal statistics
301         stat const& statistics() const
302         {
303             return m_FlatCombining.statistics();
304         }
305
306     public: // flat combining cooperation, not for direct use!
307         //@cond
308         /// Flat combining supporting function. Do not call it directly!
309         /**
310             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
311             object if the current thread becomes a combiner. Invocation of the function means that
312             the queue should perform an action recorded in \p pRec.
313         */
314         void fc_apply( fc_record * pRec )
315         {
316             assert( pRec );
317
318             // this function is called under FC mutex, so switch TSan off
319             CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
320
321             switch ( pRec->op() ) {
322             case op_enq:
323                 assert( pRec->pValEnq );
324                 m_Queue.push( *(pRec->pValEnq ) );
325                 break;
326             case op_enq_move:
327                 assert( pRec->pValEnq );
328                 m_Queue.push( std::move( *(pRec->pValEnq )) );
329                 break;
330             case op_deq:
331                 assert( pRec->pValDeq );
332                 pRec->bEmpty = m_Queue.empty();
333                 if ( !pRec->bEmpty ) {
334                     *(pRec->pValDeq) = std::move( m_Queue.front());
335                     m_Queue.pop();
336                 }
337                 break;
338             case op_clear:
339                 while ( !m_Queue.empty() )
340                     m_Queue.pop();
341                 break;
342             default:
343                 assert(false);
344                 break;
345             }
346             CDS_TSAN_ANNOTATE_IGNORE_RW_END;
347         }
348
349         /// Batch-processing flat combining
350         void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
351         {
352             typedef typename fc_kernel::iterator fc_iterator;
353
354             // this function is called under FC mutex, so switch TSan off
355             CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
356
357             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
358                 switch ( it->op() ) {
359                 case op_enq:
360                 case op_enq_move:
361                 case op_deq:
362                     if ( m_Queue.empty() ) {
363                         if ( itPrev != itEnd && collide( *itPrev, *it ))
364                             itPrev = itEnd;
365                         else
366                             itPrev = it;
367                     }
368                     break;
369                 }
370             }
371             CDS_TSAN_ANNOTATE_IGNORE_RW_END;
372         }
373         //@endcond
374
375     private:
376         //@cond
377         bool collide( fc_record& rec1, fc_record& rec2 )
378         {
379             switch ( rec1.op() ) {
380                 case op_enq:
381                     if ( rec2.op() == op_deq ) {
382                         assert(rec1.pValEnq);
383                         assert(rec2.pValDeq);
384                         *rec2.pValDeq = *rec1.pValEnq;
385                         rec2.bEmpty = false;
386                         goto collided;
387                     }
388                     break;
389                 case op_enq_move:
390                     if ( rec2.op() == op_deq ) {
391                         assert(rec1.pValEnq);
392                         assert(rec2.pValDeq);
393                         *rec2.pValDeq = std::move( *rec1.pValEnq );
394                         rec2.bEmpty = false;
395                         goto collided;
396                     }
397                     break;
398                 case op_deq:
399                     switch ( rec2.op() ) {
400                     case op_enq:
401                     case op_enq_move:
402                         return collide( rec2, rec1 );
403                     }
404             }
405             return false;
406
407         collided:
408             m_FlatCombining.operation_done( rec1 );
409             m_FlatCombining.operation_done( rec2 );
410             m_FlatCombining.internal_statistics().onCollide();
411             return true;
412         }
413         //@endcond
414
415     };
416 }} // namespace cds::container
417
418 #endif // #ifndef CDSLIB_CONTAINER_FCQUEUE_H