Added wait strategies to flat combining technique
[libcds.git] / cds / container / fcpriority_queue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #ifndef CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H
32 #define CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H
33
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
36 #include <queue>
37
38 namespace cds { namespace container {
39
40     /// FCPriorityQueue related definitions
41     /** @ingroup cds_nonintrusive_helper
42     */
43     namespace fcpqueue {
44
45         /// FCPriorityQueue internal statistics
46         template <typename Counter = cds::atomicity::event_counter >
47         struct stat: public cds::algo::flat_combining::stat<Counter>
48         {
49             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
50             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
51
52             counter_type    m_nPush     ;  ///< Count of push operations
53             counter_type    m_nPushMove ;  ///< Count of push operations with move semantics
54             counter_type    m_nPop      ;  ///< Count of success pop operations
55             counter_type    m_nFailedPop;  ///< Count of failed pop operations (pop from empty queue)
56
57             //@cond
58             void    onPush()             { ++m_nPush; }
59             void    onPushMove()         { ++m_nPushMove; }
60             void    onPop( bool bFailed ) { if ( bFailed ) ++m_nFailedPop; else ++m_nPop;  }
61             //@endcond
62         };
63
64         /// FCPriorityQueue dummy statistics, no overhead
65         struct empty_stat: public cds::algo::flat_combining::empty_stat
66         {
67             //@cond
68             void    onPush()       {}
69             void    onPushMove()   {}
70             void    onPop(bool)    {}
71             //@endcond
72         };
73
74         /// FCPriorityQueue traits
75         struct traits: public cds::algo::flat_combining::traits
76         {
77             typedef empty_stat      stat;   ///< Internal statistics
78         };
79
80         /// Metafunction converting option list to traits
81         /**
82             \p Options are:
83             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
84             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
85             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
86             - \p opt::stat - internal statistics, possible type: \p fcpqueue::stat, \p fcpqueue::empty_stat (the default)
87             - \p opt::memory_model - C++ memory ordering model.
88                 List of all available memory ordering see \p opt::memory_model.
89                 Default is \p cds::opt::v:relaxed_ordering
90         */
91         template <typename... Options>
92         struct make_traits {
93 #   ifdef CDS_DOXYGEN_INVOKED
94             typedef implementation_defined type ;   ///< Metafunction result
95 #   else
96             typedef typename cds::opt::make_options<
97                 typename cds::opt::find_type_traits< traits, Options... >::type
98                 ,Options...
99             >::type   type;
100 #   endif
101         };
102
103     } // namespace fcpqueue
104
105     /// Flat-combining priority queue
106     /**
107         @ingroup cds_nonintrusive_priority_queue
108         @ingroup cds_flat_combining_container
109
110         \ref cds_flat_combining_description "Flat combining" sequential priority queue.
111         The class can be considered as a concurrent FC-based wrapper for \p std::priority_queue.
112
113         Template parameters:
114         - \p T - a value type stored in the queue
115         - \p PriorityQueue - sequential priority queue implementation, default is \p std::priority_queue<T>
116         - \p Traits - type traits of flat combining, default is \p fcpqueue::traits.
117             \p fcpqueue::make_traits metafunction can be used to construct specialized \p %fcpqueue::traits
118     */
119     template <typename T,
120         class PriorityQueue = std::priority_queue<T>,
121         typename Traits = fcpqueue::traits
122     >
123     class FCPriorityQueue
124 #ifndef CDS_DOXYGEN_INVOKED
125         : public cds::algo::flat_combining::container
126 #endif
127     {
128     public:
129         typedef T               value_type;          ///< Value type
130         typedef PriorityQueue   priority_queue_type; ///< Sequential priority queue class
131         typedef Traits          traits;              ///< Priority queue type traits
132
133         typedef typename traits::stat  stat;    ///< Internal statistics type
134
135     protected:
136         //@cond
137         // Priority queue operation IDs
138         enum fc_operation {
139             op_push = cds::algo::flat_combining::req_Operation,
140             op_push_move,
141             op_pop,
142             op_clear,
143             op_empty
144         };
145
146         // Flat combining publication list record
147         struct fc_record: public cds::algo::flat_combining::publication_record
148         {
149             union {
150                 value_type const *  pValPush; // Value to push
151                 value_type *        pValPop;  // Pop destination
152             };
153             bool            bEmpty; // true if the queue is empty
154         };
155         //@endcond
156
157         /// Flat combining kernel
158         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
159
160     protected:
161         //@cond
162         fc_kernel               m_FlatCombining;
163         priority_queue_type     m_PQueue;
164         //@endcond
165
166     public:
167         /// Initializes empty priority queue object
168         FCPriorityQueue()
169         {}
170
171         /// Initializes empty priority queue object and gives flat combining parameters
172         FCPriorityQueue(
173             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
174             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
175             )
176             : m_FlatCombining( nCompactFactor, nCombinePassCount )
177         {}
178
179         /// Inserts a new element in the priority queue
180         /**
181             The function always returns \p true
182         */
183         bool push(
184             value_type const& val ///< Value to be copied to inserted element
185         )
186         {
187             fc_record * pRec = m_FlatCombining.acquire_record();
188             pRec->pValPush = &val;
189
190             m_FlatCombining.combine( op_push, pRec, *this );
191
192             assert( pRec->is_done() );
193             m_FlatCombining.release_record( pRec );
194             m_FlatCombining.internal_statistics().onPush();
195             return true;
196         }
197
198         /// Inserts a new element in the priority queue (move semantics)
199         /**
200             The function always returns \p true
201         */
202         bool push(
203             value_type&& val ///< Value to be moved to inserted element
204         )
205         {
206             fc_record * pRec = m_FlatCombining.acquire_record();
207             pRec->pValPush = &val;
208
209             m_FlatCombining.combine( op_push_move, pRec, *this );
210
211             assert( pRec->is_done() );
212             m_FlatCombining.release_record( pRec );
213             m_FlatCombining.internal_statistics().onPushMove();
214             return true;
215         }
216
217         /// Removes the top element from priority queue
218         /**
219             The function returns \p false if the queue is empty, \p true otherwise.
220             If the queue is empty \p val is not changed.
221         */
222         bool pop(
223             value_type& val ///< Target to be received the copy of top element
224         )
225         {
226             fc_record * pRec = m_FlatCombining.acquire_record();
227             pRec->pValPop = &val;
228
229             m_FlatCombining.combine( op_pop, pRec, *this );
230
231             assert( pRec->is_done() );
232             m_FlatCombining.release_record( pRec );
233             m_FlatCombining.internal_statistics().onPop( pRec->bEmpty );
234             return !pRec->bEmpty;
235         }
236
237         /// Clears the priority queue
238         void clear()
239         {
240             fc_record * pRec = m_FlatCombining.acquire_record();
241
242            m_FlatCombining.combine( op_clear, pRec, *this );
243
244             assert( pRec->is_done() );
245             m_FlatCombining.release_record( pRec );
246         }
247
248         /// Returns the number of elements in the priority queue.
249         /**
250             Note that <tt>size() == 0</tt> does not mean that the queue is empty because
251             combining record can be in process.
252             To check emptiness use \ref empty function.
253         */
254         size_t size() const
255         {
256             return m_PQueue.size();
257         }
258
259         /// Checks if the priority queue is empty
260         /**
261             If the combining is in process the function waits while combining done.
262         */
263         bool empty()
264         {
265             fc_record * pRec = m_FlatCombining.acquire_record();
266
267             m_FlatCombining.combine( op_empty, pRec, *this );
268             assert( pRec->is_done() );
269             m_FlatCombining.release_record( pRec );
270             return pRec->bEmpty;
271         }
272
273         /// Internal statistics
274         stat const& statistics() const
275         {
276             return m_FlatCombining.statistics();
277         }
278
279     public: // flat combining cooperation, not for direct use!
280         //@cond
281         /*
282             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
283             object if the current thread becomes a combiner. Invocation of the function means that
284             the priority queue should perform an action recorded in \p pRec.
285         */
286         void fc_apply( fc_record * pRec )
287         {
288             assert( pRec );
289
290             // this function is called under FC mutex, so switch TSan off
291             CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
292
293             switch ( pRec->op() ) {
294             case op_push:
295                 assert( pRec->pValPush );
296                 m_PQueue.push( *(pRec->pValPush) );
297                 break;
298             case op_push_move:
299                 assert( pRec->pValPush );
300                 m_PQueue.push( std::move( *(pRec->pValPush )) );
301                 break;
302             case op_pop:
303                 assert( pRec->pValPop );
304                 pRec->bEmpty = m_PQueue.empty();
305                 if ( !pRec->bEmpty ) {
306                     *(pRec->pValPop) = std::move( m_PQueue.top());
307                     m_PQueue.pop();
308                 }
309                 break;
310             case op_clear:
311                 while ( !m_PQueue.empty() )
312                     m_PQueue.pop();
313                 break;
314             case op_empty:
315                 pRec->bEmpty = m_PQueue.empty();
316                 break;
317             default:
318                 assert(false);
319                 break;
320             }
321
322             CDS_TSAN_ANNOTATE_IGNORE_RW_END;
323         }
324         //@endcond
325     };
326
327 }} // namespace cds::container
328
329 #endif // #ifndef CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H