Added a comment
[libcds.git] / cds / container / fcqueue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_FCQUEUE_H
32 #define CDSLIB_CONTAINER_FCQUEUE_H
33
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
36 #include <queue>
37
38 namespace cds { namespace container {
39
40     /// FCQueue related definitions
41     /** @ingroup cds_nonintrusive_helper
42     */
43     namespace fcqueue {
44
45         /// FCQueue internal statistics
46         template <typename Counter = cds::atomicity::event_counter >
47         struct stat: public cds::algo::flat_combining::stat<Counter>
48         {
49             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
50             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
51
52             counter_type    m_nEnqueue     ;   ///< Count of enqueue operations
53             counter_type    m_nEnqMove     ;   ///< Count of enqueue operations with move semantics
54             counter_type    m_nDequeue     ;   ///< Count of success dequeue operations
55             counter_type    m_nFailedDeq   ;   ///< Count of failed dequeue operations (pop from empty queue)
56             counter_type    m_nCollided    ;   ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
57
58             //@cond
59             void    onEnqueue()               { ++m_nEnqueue; }
60             void    onEnqMove()               { ++m_nEnqMove; }
61             void    onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue;  }
62             void    onCollide()               { ++m_nCollided; }
63             //@endcond
64         };
65
66         /// FCQueue dummy statistics, no overhead
67         struct empty_stat: public cds::algo::flat_combining::empty_stat
68         {
69             //@cond
70             void    onEnqueue()     {}
71             void    onEnqMove()     {}
72             void    onDequeue(bool) {}
73             void    onCollide()     {}
74             //@endcond
75         };
76
77         /// FCQueue type traits
78         struct traits: public cds::algo::flat_combining::traits
79         {
80             typedef empty_stat      stat;   ///< Internal statistics
81             static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
82         };
83
84         /// Metafunction converting option list to traits
85         /**
86             \p Options are:
87             - any \p cds::algo::flat_combining::make_traits options
88             - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
89             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
90                 By default, the elimination is disabled. For queue, the elimination is possible if the queue
91                 is empty.
92         */
93         template <typename... Options>
94         struct make_traits {
95 #   ifdef CDS_DOXYGEN_INVOKED
96             typedef implementation_defined type ;   ///< Metafunction result
97 #   else
98             typedef typename cds::opt::make_options<
99                 typename cds::opt::find_type_traits< traits, Options... >::type
100                 ,Options...
101             >::type   type;
102 #   endif
103         };
104
105     } // namespace fcqueue
106
107     /// Flat-combining queue
108     /**
109         @ingroup cds_nonintrusive_queue
110         @ingroup cds_flat_combining_container
111
112         \ref cds_flat_combining_description "Flat combining" sequential queue.
113         The class can be considered as a concurrent FC-based wrapper for \p std::queue.
114
115         Template parameters:
116         - \p T - a value type stored in the queue
117         - \p Queue - sequential queue implementation, default is \p std::queue<T>
118         - \p Trats - type traits of flat combining, default is \p fcqueue::traits.
119             \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization.
120     */
121     template <typename T,
122         class Queue = std::queue<T>,
123         typename Traits = fcqueue::traits
124     >
125     class FCQueue
126 #ifndef CDS_DOXYGEN_INVOKED
127         : public cds::algo::flat_combining::container
128 #endif
129     {
130     public:
131         typedef T           value_type;     ///< Value type
132         typedef Queue       queue_type;     ///< Sequential queue class
133         typedef Traits      traits;         ///< Queue type traits
134
135         typedef typename traits::stat  stat;   ///< Internal statistics type
136         static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
137
138     protected:
139         //@cond
140         /// Queue operation IDs
141         enum fc_operation {
142             op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
143             op_enq_move,    ///< Enqueue (move semantics)
144             op_deq,         ///< Dequeue
145             op_clear        ///< Clear
146         };
147
148         /// Flat combining publication list record
149         struct fc_record: public cds::algo::flat_combining::publication_record
150         {
151             union {
152                 value_type const *  pValEnq;  ///< Value to enqueue
153                 value_type *        pValDeq;  ///< Dequeue destination
154             };
155             bool            bEmpty; ///< \p true if the queue is empty
156         };
157         //@endcond
158
159         /// Flat combining kernel
160         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
161
162     protected:
163         //@cond
164         mutable fc_kernel m_FlatCombining;
165         queue_type        m_Queue;
166         //@endcond
167
168     public:
169         /// Initializes empty queue object
170         FCQueue()
171         {}
172
173         /// Initializes empty queue object and gives flat combining parameters
174         FCQueue(
175             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
176             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
177             )
178             : m_FlatCombining( nCompactFactor, nCombinePassCount )
179         {}
180
181         /// Inserts a new element at the end of the queue
182         /**
183             The content of the new element initialized to a copy of \p val.
184
185             The function always returns \p true
186         */
187         bool enqueue( value_type const& val )
188         {
189             auto pRec = m_FlatCombining.acquire_record();
190             pRec->pValEnq = &val;
191
192             if ( c_bEliminationEnabled )
193                 m_FlatCombining.batch_combine( op_enq, pRec, *this );
194             else
195                 m_FlatCombining.combine( op_enq, pRec, *this );
196
197             assert( pRec->is_done());
198             m_FlatCombining.release_record( pRec );
199             m_FlatCombining.internal_statistics().onEnqueue();
200             return true;
201         }
202
203         /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
204         bool push( value_type const& val )
205         {
206             return enqueue( val );
207         }
208
209         /// Inserts a new element at the end of the queue (move semantics)
210         /**
211             \p val is moved to inserted element
212         */
213         bool enqueue( value_type&& val )
214         {
215             auto pRec = m_FlatCombining.acquire_record();
216             pRec->pValEnq = &val;
217
218             if ( c_bEliminationEnabled )
219                 m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
220             else
221                 m_FlatCombining.combine( op_enq_move, pRec, *this );
222
223             assert( pRec->is_done());
224             m_FlatCombining.release_record( pRec );
225
226             m_FlatCombining.internal_statistics().onEnqMove();
227             return true;
228         }
229
230         /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
231         bool push( value_type&& val )
232         {
233             return enqueue( val );
234         }
235
236         /// Removes the next element from the queue
237         /**
238             \p val takes a copy of the element
239         */
240         bool dequeue( value_type& val )
241         {
242             auto pRec = m_FlatCombining.acquire_record();
243             pRec->pValDeq = &val;
244
245             if ( c_bEliminationEnabled )
246                 m_FlatCombining.batch_combine( op_deq, pRec, *this );
247             else
248                 m_FlatCombining.combine( op_deq, pRec, *this );
249
250             assert( pRec->is_done());
251             m_FlatCombining.release_record( pRec );
252
253             m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
254             return !pRec->bEmpty;
255         }
256
257         /// Removes the next element from the queue (a synonym for \ref dequeue)
258         bool pop( value_type& val )
259         {
260             return dequeue( val );
261         }
262
263         /// Clears the queue
264         void clear()
265         {
266             auto pRec = m_FlatCombining.acquire_record();
267
268             if ( c_bEliminationEnabled )
269                 m_FlatCombining.batch_combine( op_clear, pRec, *this );
270             else
271                 m_FlatCombining.combine( op_clear, pRec, *this );
272
273             assert( pRec->is_done());
274             m_FlatCombining.release_record( pRec );
275         }
276
277         /// Returns the number of elements in the queue.
278         /**
279             Note that <tt>size() == 0</tt> is not mean that the queue is empty because
280             combining record can be in process.
281             To check emptiness use \ref empty function.
282         */
283         size_t size() const
284         {
285             return m_Queue.size();
286         }
287
288         /// Checks if the queue is empty
289         /**
290             If the combining is in process the function waits while combining done.
291         */
292         bool empty() const
293         {
294             bool bRet = false;
295             auto const& queue = m_Queue;
296             m_FlatCombining.invoke_exclusive( [&queue, &bRet]() { bRet = queue.empty(); } );
297             return bRet;
298         }
299
300         /// Internal statistics
301         stat const& statistics() const
302         {
303             return m_FlatCombining.statistics();
304         }
305
306     public: // flat combining cooperation, not for direct use!
307         //@cond
308         /// Flat combining supporting function. Do not call it directly!
309         /**
310             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
311             object if the current thread becomes a combiner. Invocation of the function means that
312             the queue should perform an action recorded in \p pRec.
313         */
314         void fc_apply( fc_record * pRec )
315         {
316             assert( pRec );
317
318             switch ( pRec->op()) {
319             case op_enq:
320                 assert( pRec->pValEnq );
321                 m_Queue.push( *(pRec->pValEnq ));
322                 break;
323             case op_enq_move:
324                 assert( pRec->pValEnq );
325                 m_Queue.push( std::move( *(pRec->pValEnq )));
326                 break;
327             case op_deq:
328                 assert( pRec->pValDeq );
329                 pRec->bEmpty = m_Queue.empty();
330                 if ( !pRec->bEmpty ) {
331                     *(pRec->pValDeq) = std::move( m_Queue.front());
332                     m_Queue.pop();
333                 }
334                 break;
335             case op_clear:
336                 while ( !m_Queue.empty())
337                     m_Queue.pop();
338                 break;
339             default:
340                 assert(false);
341                 break;
342             }
343         }
344
345         /// Batch-processing flat combining
346         void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
347         {
348             typedef typename fc_kernel::iterator fc_iterator;
349
350             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
351                 switch ( it->op( atomics::memory_order_acquire )) {
352                 case op_enq:
353                 case op_enq_move:
354                 case op_deq:
355                     if ( m_Queue.empty()) {
356                         if ( itPrev != itEnd && collide( *itPrev, *it ))
357                             itPrev = itEnd;
358                         else
359                             itPrev = it;
360                     }
361                     break;
362                 }
363             }
364         }
365         //@endcond
366
367     private:
368         //@cond
369         bool collide( fc_record& rec1, fc_record& rec2 )
370         {
371             switch ( rec1.op()) {
372                 case op_enq:
373                     if ( rec2.op() == op_deq ) {
374                         assert(rec1.pValEnq);
375                         assert(rec2.pValDeq);
376                         *rec2.pValDeq = *rec1.pValEnq;
377                         rec2.bEmpty = false;
378                         goto collided;
379                     }
380                     break;
381                 case op_enq_move:
382                     if ( rec2.op() == op_deq ) {
383                         assert(rec1.pValEnq);
384                         assert(rec2.pValDeq);
385                         *rec2.pValDeq = std::move( *rec1.pValEnq );
386                         rec2.bEmpty = false;
387                         goto collided;
388                     }
389                     break;
390                 case op_deq:
391                     switch ( rec2.op()) {
392                     case op_enq:
393                     case op_enq_move:
394                         return collide( rec2, rec1 );
395                     }
396             }
397             return false;
398
399         collided:
400             m_FlatCombining.operation_done( rec1 );
401             m_FlatCombining.operation_done( rec2 );
402             m_FlatCombining.internal_statistics().onCollide();
403             return true;
404         }
405         //@endcond
406
407     };
408 }} // namespace cds::container
409
410 #endif // #ifndef CDSLIB_CONTAINER_FCQUEUE_H