[UBsan] Added proper alignment for EllenBinTree node
[libcds.git] / cds / intrusive / fcqueue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H
32 #define CDSLIB_INTRUSIVE_FCQUEUE_H
33
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
36 #include <cds/intrusive/options.h>
37 #include <boost/intrusive/list.hpp>
38
39 namespace cds { namespace intrusive {
40
41     /// \p FCQueue related definitions
42     namespace fcqueue {
43
44         /// \p FCQueue internal statistics
45         template <typename Counter = cds::atomicity::event_counter >
46         struct stat: public cds::algo::flat_combining::stat<Counter>
47         {
48             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
49             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
50
51             counter_type    m_nEnqueue     ;   ///< Count of push operations
52             counter_type    m_nDequeue     ;   ///< Count of success pop operations
53             counter_type    m_nFailedDeq   ;   ///< Count of failed pop operations (pop from empty queue)
54             counter_type    m_nCollided    ;   ///< How many pairs of push/pop were collided, if elimination is enabled
55
56             //@cond
57             void    onEnqueue()                 { ++m_nEnqueue; }
58             void    onDequeue( bool bFailed )   { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue;  }
59             void    onCollide()                 { ++m_nCollided; }
60             //@endcond
61         };
62
63         /// FCQueue dummy statistics, no overhead
64         struct empty_stat: public cds::algo::flat_combining::empty_stat
65         {
66             //@cond
67             void    onEnqueue()     {}
68             void    onDequeue(bool) {}
69             void    onCollide()     {}
70             //@endcond
71         };
72
73         /// \p FCQueue type traits
74         struct traits: public cds::algo::flat_combining::traits
75         {
76             typedef cds::intrusive::opt::v::empty_disposer  disposer ; ///< Disposer to erase removed elements. Used only in \p FCQueue::clear() function
77             typedef empty_stat      stat;   ///< Internal statistics
78             static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
79         };
80
81         /// Metafunction converting option list to traits
82         /**
83             \p Options are:
84             - any \p cds::algo::flat_combining::make_traits options
85             - \p opt::disposer - the functor used to dispose removed items. Default is \p opt::intrusive::v::empty_disposer.
86                 This option is used only in \p FCQueue::clear() function.
87             - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
88             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
89                 By default, the elimination is disabled (\p false)
90         */
91         template <typename... Options>
92         struct make_traits {
93 #   ifdef CDS_DOXYGEN_INVOKED
94             typedef implementation_defined type ;   ///< Metafunction result
95 #   else
96             typedef typename cds::opt::make_options<
97                 typename cds::opt::find_type_traits< traits, Options... >::type
98                 ,Options...
99             >::type   type;
100 #   endif
101         };
102     } // namespace fcqueue
103
104     /// Flat-combining intrusive queue
105     /**
106         @ingroup cds_intrusive_queue
107         @ingroup cds_flat_combining_intrusive
108
109         \ref cds_flat_combining_description "Flat combining" sequential intrusive queue.
110
111         Template parameters:
112         - \p T - a value type stored in the queue
113         - \p Container - sequential intrusive container with \p push_back and \p pop_front functions.
114             Default is \p boost::intrusive::list
115         - \p Traits - type traits of flat combining, default is \p fcqueue::traits.
116             \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization
117     */
118     template <typename T
119         ,class Container = boost::intrusive::list<T>
120         ,typename Traits = fcqueue::traits
121     >
122     class FCQueue
123 #ifndef CDS_DOXYGEN_INVOKED
124         : public cds::algo::flat_combining::container
125 #endif
126     {
127     public:
128         typedef T           value_type;     ///< Value type
129         typedef Container   container_type; ///< Sequential container type
130         typedef Traits      traits;         ///< Queue traits
131
132         typedef typename traits::disposer   disposer;   ///< The disposer functor. The disposer is used only in \ref clear() function
133         typedef typename traits::stat       stat;   ///< Internal statistics type
134         static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
135
136     protected:
137         //@cond
138         /// Queue operation IDs
139         enum fc_operation {
140             op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
141             op_deq,                 ///< Dequeue
142             op_clear,               ///< Clear
143             op_clear_and_dispose    ///< Clear and dispose
144         };
145
146         /// Flat combining publication list record
147         struct fc_record: public cds::algo::flat_combining::publication_record
148         {
149             value_type * pVal;  ///< Value to enqueue or dequeue
150             bool         bEmpty; ///< \p true if the queue is empty
151         };
152         //@endcond
153
154         /// Flat combining kernel
155         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
156
157     protected:
158         //@cond
159         mutable fc_kernel m_FlatCombining;
160         container_type    m_Queue;
161         //@endcond
162
163     public:
164         /// Initializes empty queue object
165         FCQueue()
166         {}
167
168         /// Initializes empty queue object and gives flat combining parameters
169         FCQueue(
170             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
171             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
172             )
173             : m_FlatCombining( nCompactFactor, nCombinePassCount )
174         {}
175
176         /// Inserts a new element at the end of the queue
177         /**
178             The function always returns \p true.
179         */
180         bool enqueue( value_type& val )
181         {
182             auto pRec = m_FlatCombining.acquire_record();
183             pRec->pVal = &val;
184
185             if ( c_bEliminationEnabled )
186                 m_FlatCombining.batch_combine( op_enq, pRec, *this );
187             else
188                 m_FlatCombining.combine( op_enq, pRec, *this );
189
190             assert( pRec->is_done());
191             m_FlatCombining.release_record( pRec );
192             m_FlatCombining.internal_statistics().onEnqueue();
193             return true;
194         }
195
196         /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
197         bool push( value_type& val )
198         {
199             return enqueue( val );
200         }
201
202         /// Removes the next element from the queue
203         /**
204             If the queue is empty the function returns \p nullptr
205         */
206         value_type * dequeue()
207         {
208             auto pRec = m_FlatCombining.acquire_record();
209             pRec->pVal = nullptr;
210
211             if ( c_bEliminationEnabled )
212                 m_FlatCombining.batch_combine( op_deq, pRec, *this );
213             else
214                 m_FlatCombining.combine( op_deq, pRec, *this );
215
216             assert( pRec->is_done());
217             m_FlatCombining.release_record( pRec );
218
219             m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
220             return pRec->pVal;
221         }
222
223         /// Removes the next element from the queue (a synonym for \ref dequeue)
224         value_type * pop()
225         {
226             return dequeue();
227         }
228
229         /// Clears the queue
230         /**
231             If \p bDispose is \p true, the disposer provided in \p Traits class' template parameter
232             will be called for each removed element.
233         */
234         void clear( bool bDispose = false )
235         {
236             auto pRec = m_FlatCombining.acquire_record();
237
238             if ( c_bEliminationEnabled )
239                 m_FlatCombining.batch_combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
240             else
241                 m_FlatCombining.combine( bDispose ? op_clear_and_dispose : op_clear, pRec, *this );
242
243             assert( pRec->is_done());
244             m_FlatCombining.release_record( pRec );
245         }
246
247         /// Returns the number of elements in the queue.
248         /**
249             Note that <tt>size() == 0</tt> is not mean that the queue is empty because
250             combining record can be in process.
251             To check emptiness use \ref empty function.
252         */
253         size_t size() const
254         {
255             return m_Queue.size();
256         }
257
258         /// Checks if the queue is empty
259         /**
260             If the combining is in process the function waits while it is done.
261         */
262         bool empty() const
263         {
264             bool bRet = false;
265             auto const& queue = m_Queue;
266             m_FlatCombining.invoke_exclusive([&queue, &bRet]() { bRet = queue.empty(); });
267             return bRet;
268         }
269
270         /// Internal statistics
271         stat const& statistics() const
272         {
273             return m_FlatCombining.statistics();
274         }
275
276     public: // flat combining cooperation, not for direct use!
277         //@cond
278         /// Flat combining supporting function. Do not call it directly!
279         /**
280             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
281             object if the current thread becomes a combiner. Invocation of the function means that
282             the queue should perform an action recorded in \p pRec.
283         */
284         void fc_apply( fc_record * pRec )
285         {
286             assert( pRec );
287
288             // this function is called under FC mutex, so switch TSan off
289             // All TSan warnings are false positive
290             CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
291
292             switch ( pRec->op()) {
293             case op_enq:
294                 assert( pRec->pVal );
295                 m_Queue.push_back( *(pRec->pVal ));
296                 break;
297             case op_deq:
298                 pRec->bEmpty = m_Queue.empty();
299                 if ( !pRec->bEmpty ) {
300                     pRec->pVal = &m_Queue.front();
301                     m_Queue.pop_front();
302                 }
303                 break;
304             case op_clear:
305                 m_Queue.clear();
306                 break;
307             case op_clear_and_dispose:
308                 m_Queue.clear_and_dispose( disposer());
309                 break;
310             default:
311                 assert(false);
312                 break;
313             }
314             CDS_TSAN_ANNOTATE_IGNORE_RW_END;
315         }
316
317         /// Batch-processing flat combining
318         void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
319         {
320             // this function is called under FC mutex, so switch TSan off
321             // All TSan warnings are false positive
322             CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
323
324             typedef typename fc_kernel::iterator fc_iterator;
325             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
326                 switch ( it->op()) {
327                 case op_enq:
328                 case op_deq:
329                     if ( m_Queue.empty()) {
330                         if ( itPrev != itEnd && collide( *itPrev, *it ))
331                             itPrev = itEnd;
332                         else
333                             itPrev = it;
334                     }
335                     break;
336                 }
337             }
338             CDS_TSAN_ANNOTATE_IGNORE_RW_END;
339         }
340         //@endcond
341
342     private:
343         //@cond
344         bool collide( fc_record& rec1, fc_record& rec2 )
345         {
346             assert( m_Queue.empty());
347
348             switch ( rec1.op()) {
349                 case op_enq:
350                     if ( rec2.op() == op_deq ) {
351                         assert(rec1.pVal);
352                         rec2.pVal = rec1.pVal;
353                         rec2.bEmpty = false;
354                         m_FlatCombining.operation_done( rec1 );
355                         m_FlatCombining.operation_done( rec2 );
356                         m_FlatCombining.internal_statistics().onCollide();
357                         return true;
358                     }
359                     break;
360                 case op_deq:
361                     if ( rec2.op() == op_enq ) {
362                         assert(rec2.pVal);
363                         rec1.pVal = rec2.pVal;
364                         rec1.bEmpty = false;
365                         m_FlatCombining.operation_done( rec1 );
366                         m_FlatCombining.operation_done( rec2 );
367                         m_FlatCombining.internal_statistics().onCollide();
368                         return true;
369                     }
370                     break;
371             }
372             return false;
373         }
374         //@endcond
375     };
376
377 }} // namespace cds::intrusive
378
379 #endif // #ifndef CDSLIB_INTRUSIVE_FCQUEUE_H