Added copyright and license
[libcds.git] / cds / container / rwqueue.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #ifndef CDSLIB_CONTAINER_RWQUEUE_H
32 #define CDSLIB_CONTAINER_RWQUEUE_H
33
34 #include <mutex>        // unique_lock
35 #include <cds/sync/spinlock.h>
36 #include <cds/opt/options.h>
37 #include <cds/details/allocator.h>
38
39 namespace cds { namespace container {
40     /// RWQueue related definitions
41     /** @ingroup cds_nonintrusive_helper
42     */
43     namespace rwqueue {
44         /// RWQueue default type traits
45         struct traits
46         {
47             /// Lock policy
48             typedef cds::sync::spin  lock_type;
49
50             /// Node allocator
51             typedef CDS_DEFAULT_ALLOCATOR   allocator;
52
53             /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
54             typedef cds::atomicity::empty_item_counter item_counter;
55
56             /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
57             enum { padding = opt::cache_line_padding };
58         };
59
60         /// Metafunction converting option list to \p rwqueue::traits
61         /**
62             Supported \p Options are:
63             - opt::lock_type - lock policy, default is \p cds::sync::spin. Any type satisfied \p Mutex C++ concept may be used.
64             - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
65             - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
66                 To enable item counting use \p cds::atomicity::item_counter.
67             - \p opt::padding - padding for internal critical data. Default is \p opt::cache_line_padding
68
69             Example: declare mutex-based \p %RWQueue with item counting
70             \code
71             typedef cds::container::RWQueue< Foo,
72                 typename cds::container::rwqueue::make_traits<
73                     cds::opt::item_counter< cds::atomicity::item_counter >,
74                     cds::opt::lock_type< std::mutex >
75                 >::type
76             > myQueue;
77             \endcode
78         */
79         template <typename... Options>
80         struct make_traits {
81 #   ifdef CDS_DOXYGEN_INVOKED
82             typedef implementation_defined type;   ///< Metafunction result
83 #   else
84             typedef typename cds::opt::make_options<
85                 typename cds::opt::find_type_traits< traits, Options... >::type
86                 , Options...
87             >::type type;
88 #   endif
89         };
90
91     } // namespace rwqueue
92
93     /// Michael & Scott blocking queue with fine-grained synchronization schema
94     /** @ingroup cds_nonintrusive_queue
95         The queue has two different locks: one for reading and one for writing.
96         Therefore, one writer and one reader can simultaneously access to the queue.
97         The queue does not require any garbage collector.
98
99         <b>Source</b>
100             - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
101                 and blocking concurrent queue algorithms"
102
103         <b>Template arguments</b>
104         - \p T - value type to be stored in the queue
105         - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits
106             metafunction to make your traits or just derive your traits from \p %rwqueue::traits:
107             \code
108             struct myTraits: public cds::container::rwqueue::traits {
109                 typedef cds::atomicity::item_counter    item_counter;
110             };
111             typedef cds::container::RWQueue< Foo, myTraits > myQueue;
112
113             // Equivalent make_traits example:
114             typedef cds::container::RWQueue< Foo,
115                 typename cds::container::rwqueue::make_traits<
116                     cds::opt::item_counter< cds::atomicity::item_counter >
117                 >::type
118             > myQueue;
119             \endcode
120     */
121     template <typename T, typename Traits = rwqueue::traits >
122     class RWQueue
123     {
124     public:
125         /// Rebind template arguments
126         template <typename T2, typename Traits2>
127         struct rebind {
128             typedef RWQueue< T2, Traits2 > other   ;   ///< Rebinding result
129         };
130
131     public:
132         typedef T       value_type; ///< Type of value to be stored in the queue
133         typedef Traits  traits;     ///< Queue traits
134
135         typedef typename traits::lock_type  lock_type;      ///< Locking primitive
136         typedef typename traits::item_counter item_counter; ///< Item counting policy used
137
138     protected:
139         //@cond
140         /// Node type
141         struct node_type
142         {
143             atomics::atomic< node_type *> m_pNext;  ///< Pointer to the next node in the queue
144             value_type              m_value;        ///< Value stored in the node
145
146             node_type( value_type const& v )
147                 : m_pNext( nullptr )
148                 , m_value(v)
149             {}
150
151             node_type()
152                 : m_pNext( nullptr )
153             {}
154
155             template <typename... Args>
156             node_type( Args&&... args )
157                 : m_pNext( nullptr )
158                 , m_value( std::forward<Args>(args)...)
159             {}
160         };
161         //@endcond
162
163     public:
164         typedef typename traits::allocator::template rebind<node_type>::other allocator_type; ///< Allocator type used for allocate/deallocate the queue nodes
165
166     protected:
167         //@cond
168         typedef std::unique_lock<lock_type> scoped_lock;
169         typedef cds::details::Allocator< node_type, allocator_type >  node_allocator;
170
171         struct head_type {
172             mutable lock_type lock;
173             node_type *       ptr;
174         };
175
176         head_type m_Head;
177         typename opt::details::apply_padding< head_type, traits::padding >::padding_type pad_;
178         head_type m_Tail;
179
180         item_counter    m_ItemCounter;
181         //@endcond
182
183     protected:
184         //@cond
185         static node_type * alloc_node()
186         {
187             return node_allocator().New();
188         }
189
190         static node_type * alloc_node( T const& data )
191         {
192             return node_allocator().New( data );
193         }
194
195         template <typename... Args>
196         static node_type * alloc_node_move( Args&&... args )
197         {
198             return node_allocator().MoveNew( std::forward<Args>( args )... );
199         }
200
201         static void free_node( node_type * pNode )
202         {
203             node_allocator().Delete( pNode );
204         }
205
206         bool enqueue_node( node_type * p )
207         {
208             assert( p != nullptr );
209             {
210                 scoped_lock lock( m_Tail.lock );
211                 m_Tail.ptr->m_pNext.store( p, atomics::memory_order_release );
212                 m_Tail.ptr = p;
213             }
214             ++m_ItemCounter;
215             return true;
216         }
217
218         struct node_disposer {
219             void operator()( node_type * pNode )
220             {
221                 free_node( pNode );
222             }
223         };
224         typedef std::unique_ptr< node_type, node_disposer >     scoped_node_ptr;
225         //@endcond
226
227     public:
228         /// Makes empty queue
229         RWQueue()
230         {
231             node_type * pNode = alloc_node();
232             m_Head.ptr =
233                 m_Tail.ptr = pNode;
234         }
235
236         /// Destructor clears queue
237         ~RWQueue()
238         {
239             clear();
240             assert( m_Head.ptr == m_Tail.ptr );
241             free_node( m_Head.ptr );
242         }
243
244         /// Enqueues \p data. Always return \a true
245         bool enqueue( value_type const& data )
246         {
247             scoped_node_ptr p( alloc_node( data ));
248             if ( enqueue_node( p.get() )) {
249                 p.release();
250                 return true;
251             }
252             return false;
253         }
254
255         /// Enqueues \p data to the queue using a functor
256         /**
257             \p Func is a functor called to create node.
258             The functor \p f takes one argument - a reference to a new node of type \ref value_type :
259             \code
260             cds::container::RWQueue< cds::gc::HP, Foo > myQueue;
261             Bar bar;
262             myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } );
263             \endcode
264         */
265         template <typename Func>
266         bool enqueue_with( Func f )
267         {
268             scoped_node_ptr p( alloc_node() );
269             f( p->m_value );
270             if ( enqueue_node( p.get() )) {
271                 p.release();
272                 return true;
273             }
274             return false;
275         }
276
277         /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
278         template <typename... Args>
279         bool emplace( Args&&... args )
280         {
281             scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
282             if ( enqueue_node( p.get() )) {
283                 p.release();
284                 return true;
285             }
286             return false;
287         }
288
289         /// Synonym for \p enqueue() function
290         bool push( value_type const& val )
291         {
292             return enqueue( val );
293         }
294
295         /// Synonym for \p enqueue_with() function
296         template <typename Func>
297         bool push_with( Func f )
298         {
299             return enqueue_with( f );
300         }
301
302         /// Dequeues a value to \p dest.
303         /**
304             If queue is empty returns \a false, \p dest can be corrupted.
305             If queue is not empty returns \a true, \p dest contains the value dequeued
306         */
307         bool dequeue( value_type& dest )
308         {
309             return dequeue_with( [&dest]( value_type& src ) { dest = src; } );
310         }
311
312         /// Dequeues a value using a functor
313         /**
314             \p Func is a functor called to copy dequeued value.
315             The functor takes one argument - a reference to removed node:
316             \code
317             cds:container::RWQueue< cds::gc::HP, Foo > myQueue;
318             Bar bar;
319             myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
320             \endcode
321             The functor is called only if the queue is not empty.
322         */
323         template <typename Func>
324         bool dequeue_with( Func f )
325         {
326             node_type * pNode;
327             {
328                 scoped_lock lock( m_Head.lock );
329                 pNode = m_Head.ptr;
330                 node_type * pNewHead = pNode->m_pNext.load( atomics::memory_order_acquire );
331                 if ( pNewHead == nullptr )
332                     return false;
333                 f( pNewHead->m_value );
334                 m_Head.ptr = pNewHead;
335             }    // unlock here
336             --m_ItemCounter;
337             free_node( pNode );
338             return true;
339         }
340
341         /// Synonym for \p dequeue() function
342         bool pop( value_type& dest )
343         {
344             return dequeue( dest );
345         }
346
347         /// Synonym for \p dequeue_with() function
348         template <typename Func>
349         bool pop_with( Func f )
350         {
351             return dequeue_with( f );
352         }
353
354         /// Checks if queue is empty
355         bool empty() const
356         {
357             scoped_lock lock( m_Head.lock );
358             return m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) == nullptr;
359         }
360
361         /// Clears queue
362         void clear()
363         {
364             scoped_lock lockR( m_Head.lock );
365             scoped_lock lockW( m_Tail.lock );
366             while ( m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) != nullptr ) {
367                 node_type * pHead = m_Head.ptr;
368                 m_Head.ptr = m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed );
369                 free_node( pHead );
370             }
371         }
372
373         /// Returns queue's item count
374         /**
375             The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter,
376             this function always returns 0.
377
378             @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
379             is empty. To check queue emptyness use \p empty() method.
380         */
381         size_t    size() const
382         {
383             return m_ItemCounter.value();
384         }
385
386         //@cond
387         /// The class has no internal statistics. For test consistency only
388         std::nullptr_t statistics() const
389         {
390             return nullptr;
391         }
392         //@endcond
393     };
394
395 }}  // namespace cds::container
396
397 #endif // #ifndef CDSLIB_CONTAINER_RWQUEUE_H