Fixed missing acquire read in Flat-Combining algorithm (found by TSan)
[libcds.git] / cds / container / fcdeque.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_CONTAINER_FCDEQUE_H
32 #define CDSLIB_CONTAINER_FCDEQUE_H
33
34 #include <cds/algo/flat_combining.h>
35 #include <cds/algo/elimination_opt.h>
36 #include <deque>
37
38 namespace cds { namespace container {
39
40     /// FCDeque related definitions
41     /** @ingroup cds_nonintrusive_helper
42     */
43     namespace fcdeque {
44
45         /// FCDeque internal statistics
46         template <typename Counter = cds::atomicity::event_counter >
47         struct stat: public cds::algo::flat_combining::stat<Counter>
48         {
49             typedef cds::algo::flat_combining::stat<Counter>    flat_combining_stat; ///< Flat-combining statistics
50             typedef typename flat_combining_stat::counter_type  counter_type;        ///< Counter type
51
52             counter_type    m_nPushFront     ;  ///< Count of push_front operations
53             counter_type    m_nPushFrontMove ;  ///< Count of push_front operations with move semantics
54             counter_type    m_nPushBack      ;  ///< Count of push_back operations
55             counter_type    m_nPushBackMove  ;  ///< Count of push_back operations with move semantics
56             counter_type    m_nPopFront      ;  ///< Count of success pop_front operations
57             counter_type    m_nFailedPopFront;  ///< Count of failed pop_front operations (pop from empty deque)
58             counter_type    m_nPopBack       ;  ///< Count of success pop_back operations
59             counter_type    m_nFailedPopBack ;  ///< Count of failed pop_back operations (pop from empty deque)
60             counter_type    m_nCollided      ;  ///< How many pairs of push/pop were collided, if elimination is enabled
61
62             //@cond
63             void    onPushFront()             { ++m_nPushFront; }
64             void    onPushFrontMove()         { ++m_nPushFrontMove; }
65             void    onPushBack()              { ++m_nPushBack; }
66             void    onPushBackMove()          { ++m_nPushBackMove; }
67             void    onPopFront( bool bFailed ) { if ( bFailed ) ++m_nFailedPopFront; else ++m_nPopFront;  }
68             void    onPopBack( bool bFailed ) { if ( bFailed ) ++m_nFailedPopBack; else ++m_nPopBack;  }
69             void    onCollide()               { ++m_nCollided; }
70             //@endcond
71         };
72
73         /// FCDeque dummy statistics, no overhead
74         struct empty_stat: public cds::algo::flat_combining::empty_stat
75         {
76             //@cond
77             void    onPushFront()       {}
78             void    onPushFrontMove()   {}
79             void    onPushBack()        {}
80             void    onPushBackMove()    {}
81             void    onPopFront(bool)    {}
82             void    onPopBack(bool)     {}
83             void    onCollide()         {}
84             //@endcond
85         };
86
87         /// FCDeque type traits
88         struct traits: public cds::algo::flat_combining::traits
89         {
90             typedef empty_stat      stat;   ///< Internal statistics
91             static CDS_CONSTEXPR const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
92         };
93
94         /// Metafunction converting option list to traits
95         /**
96             \p Options are:
97             - any \p cds::algo::flat_combining::make_traits options
98             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
99             - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
100                 By default, the elimination is disabled. For queue, the elimination is possible if the queue
101                 is empty.
102         */
103         template <typename... Options>
104         struct make_traits {
105 #   ifdef CDS_DOXYGEN_INVOKED
106             typedef implementation_defined type ;   ///< Metafunction result
107 #   else
108             typedef typename cds::opt::make_options<
109                 typename cds::opt::find_type_traits< traits, Options... >::type
110                 ,Options...
111             >::type   type;
112 #   endif
113         };
114
115     } // namespace fcqueue
116
117     /// Flat-combining deque
118     /**
119         @ingroup cds_nonintrusive_deque
120         @ingroup cds_flat_combining_container
121
122         \ref cds_flat_combining_description "Flat combining" sequential deque.
123         The class can be considered as a concurrent FC-based wrapper for \p std::deque.
124
125         Template parameters:
126         - \p T - a value type stored in the deque
127         - \p Deque - sequential deque implementation, for example, \p std::deque<T> (the default)
128             or \p boost::container::deque
129         - \p Trats - type traits of flat combining, default is \p fcdeque::traits.
130             \p fcdeque::make_traits metafunction can be used to construct specialized \p %fcdeque::traits
131     */
132     template <typename T,
133         class Deque = std::deque<T>,
134         typename Traits = fcdeque::traits
135     >
136     class FCDeque
137 #ifndef CDS_DOXYGEN_INVOKED
138         : public cds::algo::flat_combining::container
139 #endif
140     {
141     public:
142         typedef T           value_type;     ///< Value type
143         typedef Deque       deque_type;     ///< Sequential deque class
144         typedef Traits      traits;         ///< Deque type traits
145
146         typedef typename traits::stat  stat;   ///< Internal statistics type
147         static CDS_CONSTEXPR const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
148
149     protected:
150         //@cond
151         /// Deque operation IDs
152         enum fc_operation {
153             op_push_front = cds::algo::flat_combining::req_Operation, ///< Push front
154             op_push_front_move,     ///< Push front (move semantics)
155             op_push_back,           ///< Push back
156             op_push_back_move,      ///< Push back (move semantics)
157             op_pop_front,           ///< Pop front
158             op_pop_back,            ///< Pop back
159             op_clear                ///< Clear
160         };
161
162         /// Flat combining publication list record
163         struct fc_record: public cds::algo::flat_combining::publication_record
164         {
165             union {
166                 value_type const *  pValPush; ///< Value to push
167                 value_type *        pValPop;  ///< Pop destination
168             };
169             bool            bEmpty; ///< \p true if the deque is empty
170         };
171         //@endcond
172
173         /// Flat combining kernel
174         typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
175
176     protected:
177         //@cond
178         mutable fc_kernel m_FlatCombining;
179         deque_type        m_Deque;
180         //@endcond
181
182     public:
183         /// Initializes empty deque object
184         FCDeque()
185         {}
186
187         /// Initializes empty deque object and gives flat combining parameters
188         FCDeque(
189             unsigned int nCompactFactor     ///< Flat combining: publication list compacting factor
190             ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
191             )
192             : m_FlatCombining( nCompactFactor, nCombinePassCount )
193         {}
194
195         /// Inserts a new element at the beginning of the deque container
196         /**
197             The function always returns \p true
198         */
199         bool push_front(
200             value_type const& val ///< Value to be copied to inserted element
201         )
202         {
203             auto pRec = m_FlatCombining.acquire_record();
204             pRec->pValPush = &val;
205
206             if ( c_bEliminationEnabled )
207                 m_FlatCombining.batch_combine( op_push_front, pRec, *this );
208             else
209                 m_FlatCombining.combine( op_push_front, pRec, *this );
210
211             assert( pRec->is_done());
212             m_FlatCombining.release_record( pRec );
213             m_FlatCombining.internal_statistics().onPushFront();
214             return true;
215         }
216
217         /// Inserts a new element at the beginning of the deque container (move semantics)
218         /**
219             The function always returns \p true
220         */
221         bool push_front(
222             value_type&& val ///< Value to be moved to inserted element
223         )
224         {
225             auto pRec = m_FlatCombining.acquire_record();
226             pRec->pValPush = &val;
227
228             if ( c_bEliminationEnabled )
229                 m_FlatCombining.batch_combine( op_push_front_move, pRec, *this );
230             else
231                 m_FlatCombining.combine( op_push_front_move, pRec, *this );
232
233             assert( pRec->is_done());
234             m_FlatCombining.release_record( pRec );
235             m_FlatCombining.internal_statistics().onPushFrontMove();
236             return true;
237         }
238
239         /// Inserts a new element at the end of the deque container
240         /**
241             The function always returns \p true
242         */
243         bool push_back(
244             value_type const& val ///< Value to be copied to inserted element
245         )
246         {
247             auto pRec = m_FlatCombining.acquire_record();
248             pRec->pValPush = &val;
249
250             if ( c_bEliminationEnabled )
251                 m_FlatCombining.batch_combine( op_push_back, pRec, *this );
252             else
253                 m_FlatCombining.combine( op_push_back, pRec, *this );
254
255             assert( pRec->is_done());
256             m_FlatCombining.release_record( pRec );
257             m_FlatCombining.internal_statistics().onPushBack();
258             return true;
259         }
260
261         /// Inserts a new element at the end of the deque container (move semantics)
262         /**
263             The function always returns \p true
264         */
265         bool push_back(
266             value_type&& val ///< Value to be moved to inserted element
267         )
268         {
269             auto pRec = m_FlatCombining.acquire_record();
270             pRec->pValPush = &val;
271
272             if ( c_bEliminationEnabled )
273                 m_FlatCombining.batch_combine( op_push_back_move, pRec, *this );
274             else
275                 m_FlatCombining.combine( op_push_back_move, pRec, *this );
276
277             assert( pRec->is_done());
278             m_FlatCombining.release_record( pRec );
279             m_FlatCombining.internal_statistics().onPushBackMove();
280             return true;
281         }
282
283         /// Removes the first element in the deque container
284         /**
285             The function returns \p false if the deque is empty, \p true otherwise.
286             If the deque is empty \p val is not changed.
287         */
288         bool pop_front(
289             value_type& val ///< Target to be received the copy of removed element
290         )
291         {
292             auto pRec = m_FlatCombining.acquire_record();
293             pRec->pValPop = &val;
294
295             if ( c_bEliminationEnabled )
296                 m_FlatCombining.batch_combine( op_pop_front, pRec, *this );
297             else
298                 m_FlatCombining.combine( op_pop_front, pRec, *this );
299
300             assert( pRec->is_done());
301             m_FlatCombining.release_record( pRec );
302             m_FlatCombining.internal_statistics().onPopFront( pRec->bEmpty );
303             return !pRec->bEmpty;
304         }
305
306         /// Removes the last element in the deque container
307         /**
308             The function returns \p false if the deque is empty, \p true otherwise.
309             If the deque is empty \p val is not changed.
310         */
311         bool pop_back(
312             value_type& val ///< Target to be received the copy of removed element
313         )
314         {
315             auto pRec = m_FlatCombining.acquire_record();
316             pRec->pValPop = &val;
317
318             if ( c_bEliminationEnabled )
319                 m_FlatCombining.batch_combine( op_pop_back, pRec, *this );
320             else
321                 m_FlatCombining.combine( op_pop_back, pRec, *this );
322
323             assert( pRec->is_done());
324             m_FlatCombining.release_record( pRec );
325             m_FlatCombining.internal_statistics().onPopBack( pRec->bEmpty );
326             return !pRec->bEmpty;
327         }
328
329         /// Clears the deque
330         void clear()
331         {
332             auto pRec = m_FlatCombining.acquire_record();
333
334             if ( c_bEliminationEnabled )
335                 m_FlatCombining.batch_combine( op_clear, pRec, *this );
336             else
337                 m_FlatCombining.combine( op_clear, pRec, *this );
338
339             assert( pRec->is_done());
340             m_FlatCombining.release_record( pRec );
341         }
342
343         /// Returns the number of elements in the deque.
344         /**
345             Note that <tt>size() == 0</tt> is not mean that the deque is empty because
346             combining record can be in process.
347             To check emptiness use \ref empty function.
348         */
349         size_t size() const
350         {
351             return m_Deque.size();
352         }
353
354         /// Checks if the deque is empty
355         /**
356             If the combining is in process the function waits while combining done.
357         */
358         bool empty() const
359         {
360             bool bRet = false;
361             auto const& deq = m_Deque;
362             m_FlatCombining.invoke_exclusive( [&deq, &bRet]() { bRet = deq.empty(); } );
363             return bRet;
364         }
365
366         /// Internal statistics
367         stat const& statistics() const
368         {
369             return m_FlatCombining.statistics();
370         }
371
372     public: // flat combining cooperation, not for direct use!
373         //@cond
374         /// Flat combining supporting function. Do not call it directly!
375         /**
376             The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
377             object if the current thread becomes a combiner. Invocation of the function means that
378             the deque should perform an action recorded in \p pRec.
379         */
380         void fc_apply( fc_record * pRec )
381         {
382             assert( pRec );
383
384             switch ( pRec->op()) {
385             case op_push_front:
386                 assert( pRec->pValPush );
387                 m_Deque.push_front( *(pRec->pValPush));
388                 break;
389             case op_push_front_move:
390                 assert( pRec->pValPush );
391                 m_Deque.push_front( std::move( *(pRec->pValPush )));
392                 break;
393             case op_push_back:
394                 assert( pRec->pValPush );
395                 m_Deque.push_back( *(pRec->pValPush));
396                 break;
397             case op_push_back_move:
398                 assert( pRec->pValPush );
399                 m_Deque.push_back( std::move( *(pRec->pValPush )));
400                 break;
401             case op_pop_front:
402                 assert( pRec->pValPop );
403                 pRec->bEmpty = m_Deque.empty();
404                 if ( !pRec->bEmpty ) {
405                     *(pRec->pValPop) = std::move( m_Deque.front());
406                     m_Deque.pop_front();
407                 }
408                 break;
409             case op_pop_back:
410                 assert( pRec->pValPop );
411                 pRec->bEmpty = m_Deque.empty();
412                 if ( !pRec->bEmpty ) {
413                     *(pRec->pValPop) = std::move( m_Deque.back());
414                     m_Deque.pop_back();
415                 }
416                 break;
417             case op_clear:
418                 while ( !m_Deque.empty())
419                     m_Deque.pop_front();
420                 break;
421             default:
422                 assert(false);
423                 break;
424             }
425         }
426
427         /// Batch-processing flat combining
428         void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
429         {
430             typedef typename fc_kernel::iterator fc_iterator;
431
432             for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
433                 switch ( it->op( atomics::memory_order_acquire )) {
434                 case op_push_front:
435                     if ( itPrev != itEnd
436                         && (itPrev->op() == op_pop_front || (m_Deque.empty() && itPrev->op() == op_pop_back)))
437                     {
438                         collide( *it, *itPrev );
439                         itPrev = itEnd;
440                     }
441                     else
442                         itPrev = it;
443                     break;
444                 case op_push_front_move:
445                     if ( itPrev != itEnd
446                       && (itPrev->op() == op_pop_front || ( m_Deque.empty() && itPrev->op() == op_pop_back )))
447                     {
448                         collide_move( *it, *itPrev );
449                         itPrev = itEnd;
450                     }
451                     else
452                         itPrev = it;
453                     break;
454                 case op_push_back:
455                     if ( itPrev != itEnd
456                         && (itPrev->op() == op_pop_back || (m_Deque.empty() && itPrev->op() == op_pop_front)))
457                     {
458                         collide( *it, *itPrev );
459                         itPrev = itEnd;
460                     }
461                     else
462                         itPrev = it;
463                     break;
464                 case op_push_back_move:
465                     if ( itPrev != itEnd
466                         && (itPrev->op() == op_pop_back || ( m_Deque.empty() && itPrev->op() == op_pop_front )))
467                     {
468                         collide_move( *it, *itPrev );
469                         itPrev = itEnd;
470                     }
471                     else
472                         itPrev = it;
473                     break;
474                 case op_pop_front:
475                     if ( itPrev != itEnd ) {
476                         if ( m_Deque.empty()) {
477                             switch ( itPrev->op()) {
478                             case op_push_back:
479                                 collide( *itPrev, *it );
480                                 itPrev = itEnd;
481                                 break;
482                             case op_push_back_move:
483                                 collide_move( *itPrev, *it );
484                                 itPrev = itEnd;
485                                 break;
486                             default:
487                                 itPrev = it;
488                                 break;
489                             }
490                         }
491                         else {
492                             switch ( itPrev->op()) {
493                             case op_push_front:
494                                 collide( *itPrev, *it );
495                                 itPrev = itEnd;
496                                 break;
497                             case op_push_front_move:
498                                 collide_move( *itPrev, *it );
499                                 itPrev = itEnd;
500                                 break;
501                             default:
502                                 itPrev = it;
503                                 break;
504                             }
505                         }
506                     }
507                     else
508                         itPrev = it;
509                     break;
510                 case op_pop_back:
511                     if ( itPrev != itEnd ) {
512                         if ( m_Deque.empty()) {
513                             switch ( itPrev->op()) {
514                             case op_push_front:
515                                 collide( *itPrev, *it );
516                                 itPrev = itEnd;
517                                 break;
518                             case op_push_front_move:
519                                 collide_move( *itPrev, *it );
520                                 itPrev = itEnd;
521                                 break;
522                             default:
523                                 itPrev = it;
524                                 break;
525                             }
526                         }
527                         else {
528                             switch ( itPrev->op()) {
529                             case op_push_back:
530                                 collide( *itPrev, *it );
531                                 itPrev = itEnd;
532                                 break;
533                             case op_push_back_move:
534                                 collide_move( *itPrev, *it );
535                                 itPrev = itEnd;
536                                 break;
537                             default:
538                                 itPrev = it;
539                                 break;
540                             }
541                         }
542                     }
543                     else
544                         itPrev = it;
545                     break;
546                 }
547             }
548         }
549         //@endcond
550
551     private:
552         //@cond
553         void collide( fc_record& recPush, fc_record& recPop )
554         {
555             *(recPop.pValPop) = *(recPush.pValPush);
556             recPop.bEmpty = false;
557             m_FlatCombining.operation_done( recPush );
558             m_FlatCombining.operation_done( recPop );
559             m_FlatCombining.internal_statistics().onCollide();
560         }
561
562         void collide_move( fc_record& recPush, fc_record& recPop )
563         {
564             *(recPop.pValPop) = std::move( *(recPush.pValPush));
565             recPop.bEmpty = false;
566             m_FlatCombining.operation_done( recPush );
567             m_FlatCombining.operation_done( recPop );
568             m_FlatCombining.internal_statistics().onCollide();
569         }
570         //@endcond
571     };
572
573 }} // namespace cds::container
574
575 #endif // #ifndef CDSLIB_CONTAINER_FCDEQUE_H