replace null_ptr<>() with nullptr
[libcds.git] / cds / intrusive / fcqueue.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_INTRUSIVE_FCQUEUE_H
4 #define __CDS_INTRUSIVE_FCQUEUE_H
5
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
8 #include <cds/intrusive/options.h>
9 #include <boost/intrusive/list.hpp>
10
11 namespace cds { namespace intrusive {
12
13     /// FCQueue related definitions
14     namespace fcqueue {
15
16         /// FCQueue internal statistics
17         template <typename Counter = cds::atomicity::event_counter >
18         struct stat: public cds::algo::flat_combining::stat<Counter>
19         {
20             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
21             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
22
23             counter_type    m_nEnqueue     ;   ///< Count of push operations
24             counter_type    m_nDequeue     ;   ///< Count of success pop operations
25             counter_type    m_nFailedDeq   ;   ///< Count of failed pop operations (pop from empty queue)
26             counter_type    m_nCollided    ;   ///< How many pairs of push/pop were collided, if elimination is enabled
27
28             //@cond
29             void    onEnqueue()                 { ++m_nEnqueue; }
30             void    onDequeue( bool bFailed )   { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue;  }
31             void    onCollide()                 { ++m_nCollided; }
32             //@endcond
33         };
34
35         /// FCQueue dummy statistics, no overhead
36         struct empty_stat: public cds::algo::flat_combining::empty_stat
37         {
38             //@cond
39             void    onEnqueue()     {}
40             void    onDequeue(bool) {}
41             void    onCollide()     {}
42             //@endcond
43         };
44
45         /// FCQueue type traits
46         struct type_traits: public cds::algo::flat_combining::type_traits
47         {
48             typedef cds::intrusive::opt::v::empty_disposer  disposer ; ///< Disposer to erase removed elements. Used only in \p FCQueue::clear() function
49             typedef empty_stat      stat;   ///< Internal statistics
50             static CDS_CONSTEXPR_CONST bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
51         };
52
53         /// Metafunction converting option list to traits
54         /**
55             This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
56             \p Options are:
57             - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
58             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::Default
59             - \p opt::disposer - the functor used for dispose removed items. Default is opt::intrusive::v::empty_disposer.
60                 This option is used only in \p FCQueue::clear() function.
61             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
62             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
63             - \p opt::memory_model - C++ memory ordering model.
64                 List of all available memory ordering see opt::memory_model.
65                 Default if cds::opt::v:relaxed_ordering
66             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
67                 By default, the elimination is disabled.
68         */
69         template <CDS_DECL_OPTIONS8>
70         struct make_traits {
71 #   ifdef CDS_DOXYGEN_INVOKED
72             typedef implementation_defined type ;   ///< Metafunction result
73 #   else
74             typedef typename cds::opt::make_options<
75                 typename cds::opt::find_type_traits< type_traits, CDS_OPTIONS8 >::type
76                 ,CDS_OPTIONS8
77             >::type   type;
78 #   endif
79         };
80     } // namespace fcqueue
81
82     /// Flat-combining intrusive queue
83     /**
84         @ingroup cds_intrusive_queue
85         @ingroup cds_flat_combining_intrusive
86
87         \ref cds_flat_combining_description "Flat combining" sequential intrusive queue.
88
89         Template parameters:
90         - \p T - a value type stored in the queue
91         - \p Container - sequential intrusive container with \p push_back and \p pop_front functions.
92             Default is \p boost::intrusive::list
93         - \p Traits - type traits of flat combining, default is \p fcqueue::type_traits.
94             \p fcqueue::make_traits metafunction can be used to construct specialized \p %type_traits
95     */
96     template <typename T
97         ,class Container = boost::intrusive::list<T>
98         ,typename Traits = fcqueue::type_traits
99     >
100     class FCQueue
101 #ifndef CDS_DOXYGEN_INVOKED
102         : public cds::algo::flat_combining::container
103 #endif
104     {
105     public:
106         typedef T           value_type;     ///< Value type
107         typedef Container   container_type; ///< Sequential container type
108         typedef Traits      type_traits;    ///< Queue type traits
109
110         typedef typename type_traits::disposer  disposer;   ///< The disposer functor. The disposer is used only in \ref clear() function
111         typedef typename type_traits::stat  stat;   ///< Internal statistics type
112         static CDS_CONSTEXPR_CONST bool c_bEliminationEnabled = type_traits::enable_elimination; ///< \p true if elimination is enabled
113
114     protected:
115         //@cond
116         /// Queue operation IDs
117         enum fc_operation {
118             op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
119             op_deq,                 ///< Dequeue
120             op_clear,               ///< Clear
121             op_clear_and_dispose    ///< Clear and dispose
122         };
123
124         /// Flat combining publication list record
125         struct fc_record: public cds::algo::flat_combining::publication_record
126         {
127             value_type * pVal;  ///< Value to enqueue or dequeue
128             bool         bEmpty; ///< \p true if the queue is empty
129         };
130         //@endcond
131
132         /// Flat combining kernel
133         typedef cds::algo::flat_combining::kernel< fc_record, type_traits > fc_kernel;
134
135     protected:
136         //@cond
137         fc_kernel       m_FlatCombining;
138         container_type  m_Queue;
139         //@endcond
140
141     public:
142         /// Initializes empty queue object
143         FCQueue()
144         {}
145
146         /// Initializes empty queue object and gives flat combining parameters
147         FCQueue(
148             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
149             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
150             )
151             : m_FlatCombining( nCompactFactor, nCombinePassCount )
152         {}
153
154         /// Inserts a new element at the end of the queue
155         /**
156             The function always returns \p true.
157         */
158         bool enqueue( value_type& val )
159         {
160             fc_record * pRec = m_FlatCombining.acquire_record();
161             pRec->pVal = &val;
162
163             if ( c_bEliminationEnabled )
164                 m_FlatCombining.batch_combine( op_enq, pRec, *this );
165             else
166                 m_FlatCombining.combine( op_enq, pRec, *this );
167
168             assert( pRec->is_done() );
169             m_FlatCombining.release_record( pRec );
170             m_FlatCombining.internal_statistics().onEnqueue();
171             return true;
172         }
173
174         /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
175         bool push( value_type& val )
176         {
177             return enqueue( val );
178         }
179
180         /// Removes the next element from the queue
181         /**
182             If the queue is empty the function returns \p nullptr
183         */
184         value_type * dequeue()
185         {
186             fc_record * pRec = m_FlatCombining.acquire_record();
187             pRec->pVal = nullptr;
188
189             if ( c_bEliminationEnabled )
190                 m_FlatCombining.batch_combine( op_deq, pRec, *this );
191             else
192                 m_FlatCombining.combine( op_deq, pRec, *this );
193
194             assert( pRec->is_done() );
195             m_FlatCombining.release_record( pRec );
196
197             m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
198             return pRec->pVal;
199         }
200
201         /// Removes the next element from the queue (a synonym for \ref dequeue)
202         value_type * pop()
203         {
204             return dequeue();
205         }
206
207         /// Clears the queue
208         /**
209             If \p bDispose is \p true, the disposer provided in \p Traits class' template parameter
210             will be called for each removed element.
211         */
212         void clear( bool bDispose = false )
213         {
214             fc_record * pRec = m_FlatCombining.acquire_record();
215
216             if ( c_bEliminationEnabled )
217                 m_FlatCombining.batch_combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
218             else
219                 m_FlatCombining.combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
220
221             assert( pRec->is_done() );
222             m_FlatCombining.release_record( pRec );
223         }
224
225         /// Returns the number of elements in the queue.
226         /**
227             Note that <tt>size() == 0</tt> is not mean that the queue is empty because
228             combining record can be in process.
229             To check emptiness use \ref empty function.
230         */
231         size_t size() const
232         {
233             return m_Queue.size();
234         }
235
236         /// Checks if the queue is empty
237         /**
238             If the combining is in process the function waits while it is done.
239         */
240         bool empty() const
241         {
242             m_FlatCombining.wait_while_combining();
243             return m_Queue.empty();
244         }
245
246         /// Internal statistics
247         stat const& statistics() const
248         {
249             return m_FlatCombining.statistics();
250         }
251
252     public: // flat combining cooperation, not for direct use!
253         //@cond
254         /// Flat combining supporting function. Do not call it directly!
255         /**
256             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
257             object if the current thread becomes a combiner. Invocation of the function means that
258             the queue should perform an action recorded in \p pRec.
259         */
260         void fc_apply( fc_record * pRec )
261         {
262             assert( pRec );
263
264             switch ( pRec->op() ) {
265             case op_enq:
266                 assert( pRec->pVal );
267                 m_Queue.push_back( *(pRec->pVal ) );
268                 break;
269             case op_deq:
270                 pRec->bEmpty = m_Queue.empty();
271                 if ( !pRec->bEmpty ) {
272                     pRec->pVal = &m_Queue.front();
273                     m_Queue.pop_front();
274                 }
275                 break;
276             case op_clear:
277                 m_Queue.clear();
278                 break;
279             case op_clear_and_dispose:
280                 m_Queue.clear_and_dispose( disposer() );
281                 break;
282             default:
283                 assert(false);
284                 break;
285             }
286         }
287
288         /// Batch-processing flat combining
289         void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
290         {
291             typedef typename fc_kernel::iterator fc_iterator;
292             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
293                 switch ( it->op() ) {
294                 case op_enq:
295                 case op_deq:
296                     if ( m_Queue.empty() ) {
297                         if ( itPrev != itEnd && collide( *itPrev, *it ))
298                             itPrev = itEnd;
299                         else
300                             itPrev = it;
301                     }
302                     break;
303                 }
304             }
305         }
306         //@endcond
307
308     private:
309         //@cond
310         bool collide( fc_record& rec1, fc_record& rec2 )
311         {
312             assert( m_Queue.empty() );
313
314             switch ( rec1.op() ) {
315                 case op_enq:
316                     if ( rec2.op() == op_deq ) {
317                         assert(rec1.pVal);
318                         rec2.pVal = rec1.pVal;
319                         rec2.bEmpty = false;
320                         m_FlatCombining.operation_done( rec1 );
321                         m_FlatCombining.operation_done( rec2 );
322                         m_FlatCombining.internal_statistics().onCollide();
323                         return true;
324                     }
325                     break;
326                 case op_deq:
327                     if ( rec2.op() == op_enq ) {
328                         assert(rec2.pVal);
329                         rec1.pVal = rec2.pVal;
330                         rec1.bEmpty = false;
331                         m_FlatCombining.operation_done( rec1 );
332                         m_FlatCombining.operation_done( rec2 );
333                         m_FlatCombining.internal_statistics().onCollide();
334                         return true;
335                     }
336                     break;
337             }
338             return false;
339         }
340         //@endcond
341     };
342
343 }} // namespace cds::intrusive
344
345 #endif // #ifndef __CDS_INTRUSIVE_FCQUEUE_H