fixed empy function in flat combiner containers
[libcds.git] / cds / container / fcpriority_queue.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H
4 #define CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H
5
6 #include <cds/algo/flat_combining.h>
7 #include <cds/algo/elimination_opt.h>
8 #include <queue>
9
10 namespace cds { namespace container {
11
12     /// FCPriorityQueue related definitions
13     /** @ingroup cds_nonintrusive_helper
14     */
15     namespace fcpqueue {
16
17         /// FCPriorityQueue internal statistics
18         template <typename Counter = cds::atomicity::event_counter >
19         struct stat: public cds::algo::flat_combining::stat<Counter>
20         {
21             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
22             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
23
24             counter_type    m_nPush     ;  ///< Count of push operations
25             counter_type    m_nPushMove ;  ///< Count of push operations with move semantics
26             counter_type    m_nPop      ;  ///< Count of success pop operations
27             counter_type    m_nFailedPop;  ///< Count of failed pop operations (pop from empty queue)
28
29             //@cond
30             void    onPush()             { ++m_nPush; }
31             void    onPushMove()         { ++m_nPushMove; }
32             void    onPop( bool bFailed ) { if ( bFailed ) ++m_nFailedPop; else ++m_nPop;  }
33             //@endcond
34         };
35
36         /// FCPriorityQueue dummy statistics, no overhead
37         struct empty_stat: public cds::algo::flat_combining::empty_stat
38         {
39             //@cond
40             void    onPush()       {}
41             void    onPushMove()   {}
42             void    onPop(bool)    {}
43             //@endcond
44         };
45
46         /// FCPriorityQueue traits
47         struct traits: public cds::algo::flat_combining::traits
48         {
49             typedef empty_stat      stat;   ///< Internal statistics
50         };
51
52         /// Metafunction converting option list to traits
53         /**
54             \p Options are:
55             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
56             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
57             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
58             - \p opt::stat - internal statistics, possible type: \p fcpqueue::stat, \p fcpqueue::empty_stat (the default)
59             - \p opt::memory_model - C++ memory ordering model.
60                 List of all available memory ordering see \p opt::memory_model.
61                 Default is \p cds::opt::v:relaxed_ordering
62         */
63         template <typename... Options>
64         struct make_traits {
65 #   ifdef CDS_DOXYGEN_INVOKED
66             typedef implementation_defined type ;   ///< Metafunction result
67 #   else
68             typedef typename cds::opt::make_options<
69                 typename cds::opt::find_type_traits< traits, Options... >::type
70                 ,Options...
71             >::type   type;
72 #   endif
73         };
74
75     } // namespace fcpqueue
76
77     /// Flat-combining priority queue
78     /**
79         @ingroup cds_nonintrusive_priority_queue
80         @ingroup cds_flat_combining_container
81
82         \ref cds_flat_combining_description "Flat combining" sequential priority queue.
83         The class can be considered as a concurrent FC-based wrapper for \p std::priority_queue.
84
85         Template parameters:
86         - \p T - a value type stored in the queue
87         - \p PriorityQueue - sequential priority queue implementation, default is \p std::priority_queue<T>
88         - \p Traits - type traits of flat combining, default is \p fcpqueue::traits.
89             \p fcpqueue::make_traits metafunction can be used to construct specialized \p %fcpqueue::traits
90     */
91     template <typename T,
92         class PriorityQueue = std::priority_queue<T>,
93         typename Traits = fcpqueue::traits
94     >
95     class FCPriorityQueue
96 #ifndef CDS_DOXYGEN_INVOKED
97         : public cds::algo::flat_combining::container
98 #endif
99     {
100     public:
101         typedef T               value_type;          ///< Value type
102         typedef PriorityQueue   priority_queue_type; ///< Sequential priority queue class
103         typedef Traits          traits;              ///< Priority queue type traits
104
105         typedef typename traits::stat  stat;    ///< Internal statistics type
106
107     protected:
108         //@cond
109         // Priority queue operation IDs
110         enum fc_operation {
111             op_push = cds::algo::flat_combining::req_Operation,
112             op_push_move,
113             op_pop,
114             op_clear,
115             op_empty
116         };
117
118         // Flat combining publication list record
119         struct fc_record: public cds::algo::flat_combining::publication_record
120         {
121             union {
122                 value_type const *  pValPush; // Value to push
123                 value_type *        pValPop;  // Pop destination
124             };
125             bool            bEmpty; // true if the queue is empty
126         };
127         //@endcond
128
129         /// Flat combining kernel
130         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
131
132     protected:
133         //@cond
134         fc_kernel               m_FlatCombining;
135         priority_queue_type     m_PQueue;
136         //@endcond
137
138     public:
139         /// Initializes empty priority queue object
140         FCPriorityQueue()
141         {}
142
143         /// Initializes empty priority queue object and gives flat combining parameters
144         FCPriorityQueue(
145             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
146             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
147             )
148             : m_FlatCombining( nCompactFactor, nCombinePassCount )
149         {}
150
151         /// Inserts a new element in the priority queue
152         /**
153             The function always returns \p true
154         */
155         bool push(
156             value_type const& val ///< Value to be copied to inserted element
157         )
158         {
159             fc_record * pRec = m_FlatCombining.acquire_record();
160             pRec->pValPush = &val;
161
162             m_FlatCombining.combine( op_push, pRec, *this );
163
164             assert( pRec->is_done() );
165             m_FlatCombining.release_record( pRec );
166             m_FlatCombining.internal_statistics().onPush();
167             return true;
168         }
169
170         /// Inserts a new element in the priority queue (move semantics)
171         /**
172             The function always returns \p true
173         */
174         bool push(
175             value_type&& val ///< Value to be moved to inserted element
176         )
177         {
178             fc_record * pRec = m_FlatCombining.acquire_record();
179             pRec->pValPush = &val;
180
181             m_FlatCombining.combine( op_push_move, pRec, *this );
182
183             assert( pRec->is_done() );
184             m_FlatCombining.release_record( pRec );
185             m_FlatCombining.internal_statistics().onPushMove();
186             return true;
187         }
188
189         /// Removes the top element from priority queue
190         /**
191             The function returns \p false if the queue is empty, \p true otherwise.
192             If the queue is empty \p val is not changed.
193         */
194         bool pop(
195             value_type& val ///< Target to be received the copy of top element
196         )
197         {
198             fc_record * pRec = m_FlatCombining.acquire_record();
199             pRec->pValPop = &val;
200
201             m_FlatCombining.combine( op_pop, pRec, *this );
202
203             assert( pRec->is_done() );
204             m_FlatCombining.release_record( pRec );
205             m_FlatCombining.internal_statistics().onPop( pRec->bEmpty );
206             return !pRec->bEmpty;
207         }
208
209         /// Clears the priority queue
210         void clear()
211         {
212             fc_record * pRec = m_FlatCombining.acquire_record();
213
214            m_FlatCombining.combine( op_clear, pRec, *this );
215
216             assert( pRec->is_done() );
217             m_FlatCombining.release_record( pRec );
218         }
219
220         /// Returns the number of elements in the priority queue.
221         /**
222             Note that <tt>size() == 0</tt> does not mean that the queue is empty because
223             combining record can be in process.
224             To check emptiness use \ref empty function.
225         */
226         size_t size() const
227         {
228             return m_PQueue.size();
229         }
230
231         /// Checks if the priority queue is empty
232         /**
233             If the combining is in process the function waits while combining done.
234         */
235         bool empty()
236         {
237             fc_record * pRec = m_FlatCombining.acquire_record();
238
239             m_FlatCombining.combine( op_empty, pRec, *this );
240             assert( pRec->is_done() );
241             m_FlatCombining.release_record( pRec );
242             return pRec->bEmpty;
243         }
244
245         /// Internal statistics
246         stat const& statistics() const
247         {
248             return m_FlatCombining.statistics();
249         }
250
251     public: // flat combining cooperation, not for direct use!
252         //@cond
253         /*
254             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
255             object if the current thread becomes a combiner. Invocation of the function means that
256             the priority queue should perform an action recorded in \p pRec.
257         */
258         void fc_apply( fc_record * pRec )
259         {
260             assert( pRec );
261
262             switch ( pRec->op() ) {
263             case op_push:
264                 assert( pRec->pValPush );
265                 m_PQueue.push( *(pRec->pValPush) );
266                 break;
267             case op_push_move:
268                 assert( pRec->pValPush );
269                 m_PQueue.push( std::move( *(pRec->pValPush )) );
270                 break;
271             case op_pop:
272                 assert( pRec->pValPop );
273                 pRec->bEmpty = m_PQueue.empty();
274                 if ( !pRec->bEmpty ) {
275                     *(pRec->pValPop) = m_PQueue.top();
276                     m_PQueue.pop();
277                 }
278                 break;
279             case op_clear:
280                 while ( !m_PQueue.empty() )
281                     m_PQueue.pop();
282                 break;
283             case op_empty:
284                 pRec->bEmpty = m_PQueue.empty();
285                 break;
286             default:
287                 assert(false);
288                 break;
289             }
290         }
291         //@endcond
292     };
293
294 }} // namespace cds::container
295
296 #endif // #ifndef CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H