Added copyright and license
[libcds.git] / cds / intrusive / fcqueue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H
32 #define CDSLIB_INTRUSIVE_FCQUEUE_H
33
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
36 #include <cds/intrusive/options.h>
37 #include <boost/intrusive/list.hpp>
38
39 namespace cds { namespace intrusive {
40
41     /// FCQueue related definitions
42     namespace fcqueue {
43
44         /// FCQueue internal statistics
45         template <typename Counter = cds::atomicity::event_counter >
46         struct stat: public cds::algo::flat_combining::stat<Counter>
47         {
48             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
49             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
50
51             counter_type    m_nEnqueue     ;   ///< Count of push operations
52             counter_type    m_nDequeue     ;   ///< Count of success pop operations
53             counter_type    m_nFailedDeq   ;   ///< Count of failed pop operations (pop from empty queue)
54             counter_type    m_nCollided    ;   ///< How many pairs of push/pop were collided, if elimination is enabled
55
56             //@cond
57             void    onEnqueue()                 { ++m_nEnqueue; }
58             void    onDequeue( bool bFailed )   { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue;  }
59             void    onCollide()                 { ++m_nCollided; }
60             //@endcond
61         };
62
63         /// FCQueue dummy statistics, no overhead
64         struct empty_stat: public cds::algo::flat_combining::empty_stat
65         {
66             //@cond
67             void    onEnqueue()     {}
68             void    onDequeue(bool) {}
69             void    onCollide()     {}
70             //@endcond
71         };
72
73         /// FCQueue type traits
74         struct traits: public cds::algo::flat_combining::traits
75         {
76             typedef cds::intrusive::opt::v::empty_disposer  disposer ; ///< Disposer to erase removed elements. Used only in \p FCQueue::clear() function
77             typedef empty_stat      stat;   ///< Internal statistics
78             static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
79         };
80
81         /// Metafunction converting option list to traits
82         /**
83             \p Options are:
84             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
85             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default
86             - \p opt::disposer - the functor used for dispose removed items. Default is \p opt::intrusive::v::empty_disposer.
87                 This option is used only in \p FCQueue::clear() function.
88             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
89             - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
90             - \p opt::memory_model - C++ memory ordering model.
91                 List of all available memory ordering see \p opt::memory_model.
92                 Default is \p cds::opt::v:relaxed_ordering
93             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
94                 By default, the elimination is disabled.
95         */
96         template <typename... Options>
97         struct make_traits {
98 #   ifdef CDS_DOXYGEN_INVOKED
99             typedef implementation_defined type ;   ///< Metafunction result
100 #   else
101             typedef typename cds::opt::make_options<
102                 typename cds::opt::find_type_traits< traits, Options... >::type
103                 ,Options...
104             >::type   type;
105 #   endif
106         };
107     } // namespace fcqueue
108
109     /// Flat-combining intrusive queue
110     /**
111         @ingroup cds_intrusive_queue
112         @ingroup cds_flat_combining_intrusive
113
114         \ref cds_flat_combining_description "Flat combining" sequential intrusive queue.
115
116         Template parameters:
117         - \p T - a value type stored in the queue
118         - \p Container - sequential intrusive container with \p push_back and \p pop_front functions.
119             Default is \p boost::intrusive::list
120         - \p Traits - type traits of flat combining, default is \p fcqueue::traits.
121             \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization
122     */
123     template <typename T
124         ,class Container = boost::intrusive::list<T>
125         ,typename Traits = fcqueue::traits
126     >
127     class FCQueue
128 #ifndef CDS_DOXYGEN_INVOKED
129         : public cds::algo::flat_combining::container
130 #endif
131     {
132     public:
133         typedef T           value_type;     ///< Value type
134         typedef Container   container_type; ///< Sequential container type
135         typedef Traits      traits;         ///< Queue traits
136
137         typedef typename traits::disposer   disposer;   ///< The disposer functor. The disposer is used only in \ref clear() function
138         typedef typename traits::stat       stat;   ///< Internal statistics type
139         static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
140
141     protected:
142         //@cond
143         /// Queue operation IDs
144         enum fc_operation {
145             op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
146             op_deq,                 ///< Dequeue
147             op_clear,               ///< Clear
148             op_clear_and_dispose    ///< Clear and dispose
149         };
150
151         /// Flat combining publication list record
152         struct fc_record: public cds::algo::flat_combining::publication_record
153         {
154             value_type * pVal;  ///< Value to enqueue or dequeue
155             bool         bEmpty; ///< \p true if the queue is empty
156         };
157         //@endcond
158
159         /// Flat combining kernel
160         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
161
162     protected:
163         //@cond
164         fc_kernel       m_FlatCombining;
165         container_type  m_Queue;
166         //@endcond
167
168     public:
169         /// Initializes empty queue object
170         FCQueue()
171         {}
172
173         /// Initializes empty queue object and gives flat combining parameters
174         FCQueue(
175             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
176             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
177             )
178             : m_FlatCombining( nCompactFactor, nCombinePassCount )
179         {}
180
181         /// Inserts a new element at the end of the queue
182         /**
183             The function always returns \p true.
184         */
185         bool enqueue( value_type& val )
186         {
187             fc_record * pRec = m_FlatCombining.acquire_record();
188             pRec->pVal = &val;
189
190             if ( c_bEliminationEnabled )
191                 m_FlatCombining.batch_combine( op_enq, pRec, *this );
192             else
193                 m_FlatCombining.combine( op_enq, pRec, *this );
194
195             assert( pRec->is_done() );
196             m_FlatCombining.release_record( pRec );
197             m_FlatCombining.internal_statistics().onEnqueue();
198             return true;
199         }
200
201         /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
202         bool push( value_type& val )
203         {
204             return enqueue( val );
205         }
206
207         /// Removes the next element from the queue
208         /**
209             If the queue is empty the function returns \p nullptr
210         */
211         value_type * dequeue()
212         {
213             fc_record * pRec = m_FlatCombining.acquire_record();
214             pRec->pVal = nullptr;
215
216             if ( c_bEliminationEnabled )
217                 m_FlatCombining.batch_combine( op_deq, pRec, *this );
218             else
219                 m_FlatCombining.combine( op_deq, pRec, *this );
220
221             assert( pRec->is_done() );
222             m_FlatCombining.release_record( pRec );
223
224             m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
225             return pRec->pVal;
226         }
227
228         /// Removes the next element from the queue (a synonym for \ref dequeue)
229         value_type * pop()
230         {
231             return dequeue();
232         }
233
234         /// Clears the queue
235         /**
236             If \p bDispose is \p true, the disposer provided in \p Traits class' template parameter
237             will be called for each removed element.
238         */
239         void clear( bool bDispose = false )
240         {
241             fc_record * pRec = m_FlatCombining.acquire_record();
242
243             if ( c_bEliminationEnabled )
244                 m_FlatCombining.batch_combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
245             else
246                 m_FlatCombining.combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
247
248             assert( pRec->is_done() );
249             m_FlatCombining.release_record( pRec );
250         }
251
252         /// Returns the number of elements in the queue.
253         /**
254             Note that <tt>size() == 0</tt> is not mean that the queue is empty because
255             combining record can be in process.
256             To check emptiness use \ref empty function.
257         */
258         size_t size() const
259         {
260             return m_Queue.size();
261         }
262
263         /// Checks if the queue is empty
264         /**
265             If the combining is in process the function waits while it is done.
266         */
267         bool empty() const
268         {
269             m_FlatCombining.wait_while_combining();
270             return m_Queue.empty();
271         }
272
273         /// Internal statistics
274         stat const& statistics() const
275         {
276             return m_FlatCombining.statistics();
277         }
278
279     public: // flat combining cooperation, not for direct use!
280         //@cond
281         /// Flat combining supporting function. Do not call it directly!
282         /**
283             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
284             object if the current thread becomes a combiner. Invocation of the function means that
285             the queue should perform an action recorded in \p pRec.
286         */
287         void fc_apply( fc_record * pRec )
288         {
289             assert( pRec );
290
291             // this function is called under FC mutex, so switch TSan off
292             // All TSan warnings are false positive
293             CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
294
295             switch ( pRec->op() ) {
296             case op_enq:
297                 assert( pRec->pVal );
298                 m_Queue.push_back( *(pRec->pVal ) );
299                 break;
300             case op_deq:
301                 pRec->bEmpty = m_Queue.empty();
302                 if ( !pRec->bEmpty ) {
303                     pRec->pVal = &m_Queue.front();
304                     m_Queue.pop_front();
305                 }
306                 break;
307             case op_clear:
308                 m_Queue.clear();
309                 break;
310             case op_clear_and_dispose:
311                 m_Queue.clear_and_dispose( disposer() );
312                 break;
313             default:
314                 assert(false);
315                 break;
316             }
317             CDS_TSAN_ANNOTATE_IGNORE_RW_END;
318         }
319
320         /// Batch-processing flat combining
321         void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
322         {
323             // this function is called under FC mutex, so switch TSan off
324             // All TSan warnings are false positive
325             CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
326
327             typedef typename fc_kernel::iterator fc_iterator;
328             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
329                 switch ( it->op() ) {
330                 case op_enq:
331                 case op_deq:
332                     if ( m_Queue.empty() ) {
333                         if ( itPrev != itEnd && collide( *itPrev, *it ))
334                             itPrev = itEnd;
335                         else
336                             itPrev = it;
337                     }
338                     break;
339                 }
340             }
341             CDS_TSAN_ANNOTATE_IGNORE_RW_END;
342         }
343         //@endcond
344
345     private:
346         //@cond
347         bool collide( fc_record& rec1, fc_record& rec2 )
348         {
349             assert( m_Queue.empty() );
350
351             switch ( rec1.op() ) {
352                 case op_enq:
353                     if ( rec2.op() == op_deq ) {
354                         assert(rec1.pVal);
355                         rec2.pVal = rec1.pVal;
356                         rec2.bEmpty = false;
357                         m_FlatCombining.operation_done( rec1 );
358                         m_FlatCombining.operation_done( rec2 );
359                         m_FlatCombining.internal_statistics().onCollide();
360                         return true;
361                     }
362                     break;
363                 case op_deq:
364                     if ( rec2.op() == op_enq ) {
365                         assert(rec2.pVal);
366                         rec1.pVal = rec2.pVal;
367                         rec1.bEmpty = false;
368                         m_FlatCombining.operation_done( rec1 );
369                         m_FlatCombining.operation_done( rec2 );
370                         m_FlatCombining.internal_statistics().onCollide();
371                         return true;
372                     }
373                     break;
374             }
375             return false;
376         }
377         //@endcond
378     };
379
380 }} // namespace cds::intrusive
381
382 #endif // #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H