Merged branch 'master' of https://github.com/Nemo1369/libcds
[libcds.git] / cds / container / rwqueue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_RWQUEUE_H
32 #define CDSLIB_CONTAINER_RWQUEUE_H
33
34 #include <mutex>        // unique_lock
35 #include <cds/sync/spinlock.h>
36 #include <cds/opt/options.h>
37 #include <cds/details/allocator.h>
38
39 namespace cds { namespace container {
40     /// RWQueue related definitions
41     /** @ingroup cds_nonintrusive_helper
42     */
43     namespace rwqueue {
44         /// RWQueue default type traits
45         struct traits
46         {
47             /// Lock policy
48             typedef cds::sync::spin  lock_type;
49
50             /// Node allocator
51             typedef CDS_DEFAULT_ALLOCATOR   allocator;
52
53             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
54             typedef cds::atomicity::empty_item_counter item_counter;
55
56             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
57             enum { padding = opt::cache_line_padding };
58         };
59
60         /// Metafunction converting option list to \p rwqueue::traits
61         /**
62             Supported \p Options are:
63             - opt::lock_type - lock policy, default is \p cds::sync::spin. Any type satisfied \p Mutex C++ concept may be used.
64             - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
65             - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
66                 To enable item counting use \p cds::atomicity::item_counter.
67             - \p opt::padding - padding for internal critical data. Default is \p opt::cache_line_padding
68
69             Example: declare mutex-based \p %RWQueue with item counting
70             \code
71             typedef cds::container::RWQueue< Foo,
72                 typename cds::container::rwqueue::make_traits<
73                     cds::opt::item_counter< cds::atomicity::item_counter >,
74                     cds::opt::lock_type< std::mutex >
75                 >::type
76             > myQueue;
77             \endcode
78         */
79         template <typename... Options>
80         struct make_traits {
81 #   ifdef CDS_DOXYGEN_INVOKED
82             typedef implementation_defined type;   ///< Metafunction result
83 #   else
84             typedef typename cds::opt::make_options<
85                 typename cds::opt::find_type_traits< traits, Options... >::type
86                 , Options...
87             >::type type;
88 #   endif
89         };
90
91     } // namespace rwqueue
92
93     /// Michael & Scott blocking queue with fine-grained synchronization schema
94     /** @ingroup cds_nonintrusive_queue
95         The queue has two different locks: one for reading and one for writing.
96         Therefore, one writer and one reader can simultaneously access to the queue.
97         The queue does not require any garbage collector.
98
99         <b>Source</b>
100             - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
101                 and blocking concurrent queue algorithms"
102
103         <b>Template arguments</b>
104         - \p T - value type to be stored in the queue
105         - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits
106             metafunction to make your traits or just derive your traits from \p %rwqueue::traits:
107             \code
108             struct myTraits: public cds::container::rwqueue::traits {
109                 typedef cds::atomicity::item_counter    item_counter;
110             };
111             typedef cds::container::RWQueue< Foo, myTraits > myQueue;
112
113             // Equivalent make_traits example:
114             typedef cds::container::RWQueue< Foo,
115                 typename cds::container::rwqueue::make_traits<
116                     cds::opt::item_counter< cds::atomicity::item_counter >
117                 >::type
118             > myQueue;
119             \endcode
120     */
121     template <typename T, typename Traits = rwqueue::traits >
122     class RWQueue
123     {
124     public:
125         /// Rebind template arguments
126         template <typename T2, typename Traits2>
127         struct rebind {
128             typedef RWQueue< T2, Traits2 > other   ;   ///< Rebinding result
129         };
130
131     public:
132         typedef T       value_type; ///< Type of value to be stored in the queue
133         typedef Traits  traits;     ///< Queue traits
134
135         typedef typename traits::lock_type    lock_type;    ///< Locking primitive
136         typedef typename traits::item_counter item_counter; ///< Item counting policy used
137
138     protected:
139         //@cond
140         /// Node type
141         struct node_type
142         {
143             atomics::atomic< node_type *> m_pNext;  ///< Pointer to the next node in the queue
144             value_type              m_value;        ///< Value stored in the node
145
146             node_type( value_type const& v )
147                 : m_pNext( nullptr )
148                 , m_value(v)
149             {}
150
151             node_type()
152                 : m_pNext( nullptr )
153             {}
154
155             template <typename... Args>
156             node_type( Args&&... args )
157                 : m_pNext( nullptr )
158                 , m_value( std::forward<Args>(args)...)
159             {}
160         };
161         //@endcond
162
163     public:
164         typedef typename traits::allocator::template rebind<node_type>::other allocator_type; ///< Allocator type used for allocate/deallocate the queue nodes
165
166     protected:
167         //@cond
168         typedef std::unique_lock<lock_type> scoped_lock;
169         typedef cds::details::Allocator< node_type, allocator_type >  node_allocator;
170
171         struct head_type {
172             mutable lock_type lock;
173             node_type *       ptr;
174         };
175
176         head_type m_Head;
177         typename opt::details::apply_padding< head_type, traits::padding >::padding_type pad_;
178         head_type m_Tail;
179
180         item_counter    m_ItemCounter;
181         //@endcond
182
183     protected:
184         //@cond
185         static node_type * alloc_node()
186         {
187             return node_allocator().New();
188         }
189
190         static node_type * alloc_node( T const& data )
191         {
192             return node_allocator().New( data );
193         }
194
195         template <typename... Args>
196         static node_type * alloc_node_move( Args&&... args )
197         {
198             return node_allocator().MoveNew( std::forward<Args>( args )... );
199         }
200
201         static void free_node( node_type * pNode )
202         {
203             node_allocator().Delete( pNode );
204         }
205
206         bool enqueue_node( node_type * p )
207         {
208             assert( p != nullptr );
209             {
210                 scoped_lock lock( m_Tail.lock );
211                 m_Tail.ptr->m_pNext.store( p, atomics::memory_order_release );
212                 m_Tail.ptr = p;
213             }
214             ++m_ItemCounter;
215             return true;
216         }
217
218         struct node_disposer {
219             void operator()( node_type * pNode )
220             {
221                 free_node( pNode );
222             }
223         };
224         typedef std::unique_ptr< node_type, node_disposer >     scoped_node_ptr;
225         //@endcond
226
227     public:
228         /// Makes empty queue
229         RWQueue()
230         {
231             node_type * pNode = alloc_node();
232             m_Head.ptr =
233                 m_Tail.ptr = pNode;
234         }
235
236         /// Destructor clears queue
237         ~RWQueue()
238         {
239             clear();
240             assert( m_Head.ptr == m_Tail.ptr );
241             free_node( m_Head.ptr );
242         }
243
244         /// Enqueues \p data. Always return \a true
245         bool enqueue( value_type const& data )
246         {
247             scoped_node_ptr p( alloc_node( data ));
248             if ( enqueue_node( p.get())) {
249                 p.release();
250                 return true;
251             }
252             return false;
253         }
254
255         /// Enqueues \p data, move semantics
256         bool enqueue( value_type&& data )
257         {
258             scoped_node_ptr p( alloc_node_move( std::move( data )));
259             if ( enqueue_node( p.get())) {
260                 p.release();
261                 return true;
262             }
263             return false;
264         }
265
266         /// Enqueues \p data to the queue using a functor
267         /**
268             \p Func is a functor called to create node.
269             The functor \p f takes one argument - a reference to a new node of type \ref value_type :
270             \code
271             cds::container::RWQueue< cds::gc::HP, Foo > myQueue;
272             Bar bar;
273             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } );
274             \endcode
275         */
276         template <typename Func>
277         bool enqueue_with( Func f )
278         {
279             scoped_node_ptr p( alloc_node());
280             f( p->m_value );
281             if ( enqueue_node( p.get())) {
282                 p.release();
283                 return true;
284             }
285             return false;
286         }
287
288         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
289         template <typename... Args>
290         bool emplace( Args&&... args )
291         {
292             scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
293             if ( enqueue_node( p.get())) {
294                 p.release();
295                 return true;
296             }
297             return false;
298         }
299
300         /// Synonym for \p enqueue( value_type const& ) function
301         bool push( value_type const& val )
302         {
303             return enqueue( val );
304         }
305
306         /// Synonym for \p enqueue( value_type&& ) function
307         bool push( value_type&& val )
308         {
309             return enqueue( std::move( val ));
310         }
311
312         /// Synonym for \p enqueue_with() function
313         template <typename Func>
314         bool push_with( Func f )
315         {
316             return enqueue_with( f );
317         }
318
319         /// Dequeues a value to \p dest.
320         /**
321             If queue is empty returns \a false, \p dest can be corrupted.
322             If queue is not empty returns \a true, \p dest contains the value dequeued
323         */
324         bool dequeue( value_type& dest )
325         {
326             return dequeue_with( [&dest]( value_type& src ) { dest = std::move( src ); });
327         }
328
329         /// Dequeues a value using a functor
330         /**
331             \p Func is a functor called to copy dequeued value.
332             The functor takes one argument - a reference to removed node:
333             \code
334             cds:container::RWQueue< cds::gc::HP, Foo > myQueue;
335             Bar bar;
336             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
337             \endcode
338             The functor is called only if the queue is not empty.
339         */
340         template <typename Func>
341         bool dequeue_with( Func f )
342         {
343             node_type * pNode;
344             {
345                 scoped_lock lock( m_Head.lock );
346                 pNode = m_Head.ptr;
347                 node_type * pNewHead = pNode->m_pNext.load( atomics::memory_order_acquire );
348                 if ( pNewHead == nullptr )
349                     return false;
350                 f( pNewHead->m_value );
351                 m_Head.ptr = pNewHead;
352             }    // unlock here
353             --m_ItemCounter;
354             free_node( pNode );
355             return true;
356         }
357
358         /// Synonym for \p dequeue() function
359         bool pop( value_type& dest )
360         {
361             return dequeue( dest );
362         }
363
364         /// Synonym for \p dequeue_with() function
365         template <typename Func>
366         bool pop_with( Func f )
367         {
368             return dequeue_with( f );
369         }
370
371         /// Checks if queue is empty
372         bool empty() const
373         {
374             scoped_lock lock( m_Head.lock );
375             return m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) == nullptr;
376         }
377
378         /// Clears queue
379         void clear()
380         {
381             scoped_lock lockR( m_Head.lock );
382             scoped_lock lockW( m_Tail.lock );
383             while ( m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) != nullptr ) {
384                 node_type * pHead = m_Head.ptr;
385                 m_Head.ptr = m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed );
386                 free_node( pHead );
387             }
388             m_ItemCounter.reset();
389         }
390
391         /// Returns queue's item count
392         /**
393             The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter,
394             this function always returns 0.
395
396             @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
397             is empty. To check queue emptyness use \p empty() method.
398         */
399         size_t    size() const
400         {
401             return m_ItemCounter.value();
402         }
403
404         //@cond
405         /// The class has no internal statistics. For test consistency only
406         std::nullptr_t statistics() const
407         {
408             return nullptr;
409         }
410         //@endcond
411     };
412
413 }}  // namespace cds::container
414
415 #endif // #ifndef CDSLIB_CONTAINER_RWQUEUE_H