Fixed memory leaks in flat-combining algo
[libcds.git] / cds / algo / flat_combining.h
1 //$$CDS-header$$
2
3 #ifndef CDSLIB_ALGO_FLAT_COMBINING_H
4 #define CDSLIB_ALGO_FLAT_COMBINING_H
5
6 #include <mutex>
7 #include <cds/algo/atomic.h>
8 #include <cds/details/allocator.h>
9 #include <cds/algo/backoff_strategy.h>
10 #include <cds/sync/spinlock.h>
11 #include <cds/opt/options.h>
12 #include <cds/algo/int_algo.h>
13 #include <boost/thread/tss.hpp>     // thread_specific_ptr
14
15 namespace cds { namespace algo {
16
17     /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
18     /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
19
20     /// Flat combining
21     /**
22         @anchor cds_flat_combining_description
23         Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
24         [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
25         The technique converts a sequential data structure to its concurrent implementation.
26         A few structures are added to the sequential implementation: a <i>global lock</i>,
27         a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
28         of a <i>publication list</i>. The publication list is a list of thread-local records
29         of a size proportional to the number of threads that are concurrently accessing the shared object.
30
31         Each thread \p t accessing the structure to perform an invocation of some method \p m
32         on the shared object executes the following sequence of steps:
33         <ol>
34         <li>Write the invocation opcode and parameters (if any) of the method \p m to be applied
35         sequentially to the shared object in the <i>request</i> field of your thread local publication
36         record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
37         be used to receive the response. If your thread local publication record is marked as active
38         continue to step 2, otherwise continue to step 5.</li>
39         <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
40         field waiting for a response to the invocation (one can add a yield at this point to allow other threads
41         on the same core to run). Once in a while while spinning check if the lock is still taken and that your
42         record is active. If your record is inactive proceed to step 5. Once the response is available,
43         reset the request field to null and return the response.</li>
44         <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
45         return to spinning in step 2.</li>
46         <li>Otherwise, you hold the lock and are a combiner.
47         <ul>
48             <li>Increment the combining pass count by one.</li>
49             <li>Execute a \p fc_apply() by traversing the publication list from the head,
50             combining all nonnull method call invocations, setting the <i>age</i> of each of these records
51             to the current <i>count</i>, applying the combined method calls to the structure D, and returning
52             responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
53             <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
54             list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
55             by the head in the list), remove from the publication list all records whose <i>age</i> is
56             much smaller than the current <i>count</i>. This is done by removing the node and marking it
57             as inactive.</li>
58             <li>Release the lock.</li>
59         </ul>
60         <li>If you have no thread local publication record allocate one, marked as active. If you already
61         have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
62         the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
63         </ol>
64
65         As the test results show, the flat combining technique is suitable for non-intrusive containers
66         like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
67         less impressive results.
68
69         \ref cds_flat_combining_container "List of FC-based containers" in libcds.
70
71         \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
72     */
73     namespace flat_combining {
74
75         /// Special values of publication_record::nRequest
76         enum request_value
77         {
78             req_EmptyRecord,    ///< Publication record is empty
79             req_Response,       ///< Operation is done
80
81             req_Operation       ///< First operation id for derived classes
82         };
83
84         /// publication_record state
85         enum record_state {
86             inactive,       ///< Record is inactive
87             active,         ///< Record is active
88             removed         ///< Record should be removed
89         };
90
91         /// Record of publication list
92         /**
93             Each data structure based on flat combining contains a class derived from \p %publication_record
94         */
95         struct publication_record {
96             atomics::atomic<unsigned int>    nRequest;   ///< Request field (depends on data structure)
97             atomics::atomic<unsigned int>    nState;     ///< Record state: inactive, active, removed
98             atomics::atomic<unsigned int>    nAge;       ///< Age of the record
99             atomics::atomic<publication_record *> pNext; ///< Next record in publication list
100             void *                              pOwner;    ///< [internal data] Pointer to \ref kernel object that manages the publication list
101
102             /// Initializes publication record
103             publication_record()
104                 : nRequest( req_EmptyRecord )
105                 , nState( inactive )
106                 , nAge(0)
107                 , pNext( nullptr )
108                 , pOwner( nullptr )
109             {}
110
111             /// Returns the value of \p nRequest field
112             unsigned int op() const
113             {
114                 return nRequest.load( atomics::memory_order_relaxed );
115             }
116
117             /// Checks if the operation is done
118             bool is_done() const
119             {
120                 return nRequest.load( atomics::memory_order_relaxed ) == req_Response;
121             }
122         };
123
124         /// Flat combining internal statistics
125         template <typename Counter = cds::atomicity::event_counter >
126         struct stat
127         {
128             typedef Counter counter_type;   ///< Event counter type
129
130             counter_type    m_nOperationCount   ;   ///< How many operations have been performed
131             counter_type    m_nCombiningCount   ;   ///< Combining call count
132             counter_type    m_nCompactPublicationList; ///< Count of publication list compacting
133             counter_type    m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
134             counter_type    m_nActivatePubRecord;   ///< Count of publication record activating
135             counter_type    m_nPubRecordCreated ;   ///< Count of created publication records
136             counter_type    m_nPubRecordDeteted ;   ///< Count of deleted publication records
137             counter_type    m_nAcquirePubRecCount;  ///< Count of acquiring publication record
138             counter_type    m_nReleasePubRecCount;  ///< Count on releasing publication record
139
140             /// Returns current combining factor
141             /**
142                 Combining factor is how many operations perform in one combine pass:
143                 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
144             */
145             double combining_factor() const
146             {
147                 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
148             }
149
150             //@cond
151             void    onOperation()               { ++m_nOperationCount; }
152             void    onCombining()               { ++m_nCombiningCount; }
153             void    onCompactPublicationList()  { ++m_nCompactPublicationList; }
154             void    onDeactivatePubRecord()     { ++m_nDeactivatePubRecord; }
155             void    onActivatPubRecord()        { ++m_nActivatePubRecord; }
156             void    onCreatePubRecord()         { ++m_nPubRecordCreated; }
157             void    onDeletePubRecord()         { ++m_nPubRecordDeteted; }
158             void    onAcquirePubRecord()        { ++m_nAcquirePubRecCount; }
159             void    onReleasePubRecord()        { ++m_nReleasePubRecCount; }
160             //@endcond
161         };
162
163         /// Flat combining dummy internal statistics
164         struct empty_stat
165         {
166             //@cond
167             void    onOperation()               {}
168             void    onCombining()               {}
169             void    onCompactPublicationList()  {}
170             void    onDeactivatePubRecord()     {}
171             void    onActivatPubRecord()        {}
172             void    onCreatePubRecord()         {}
173             void    onDeletePubRecord()         {}
174             void    onAcquirePubRecord()        {}
175             void    onReleasePubRecord()        {}
176             //@endcond
177         };
178
179         /// Type traits of \ref kernel class
180         /**
181             You can define different type traits for \ref kernel
182             by specifying your struct based on \p %traits
183             or by using \ref make_traits metafunction.
184         */
185         struct traits
186         {
187             typedef cds::sync::spin             lock_type;  ///< Lock type
188             typedef cds::backoff::delay_of<2>   back_off;   ///< Back-off strategy
189             typedef CDS_DEFAULT_ALLOCATOR       allocator;  ///< Allocator used for TLS data (allocating publication_record derivatives)
190             typedef empty_stat                  stat;       ///< Internal statistics
191             typedef opt::v::relaxed_ordering  memory_model; ///< /// C++ memory ordering model
192         };
193
194         /// Metafunction converting option list to traits
195         /**
196             \p Options are:
197             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
198             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
199             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
200             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
201             - \p opt::memory_model - C++ memory ordering model.
202                 List of all available memory ordering see \p opt::memory_model.
203                 Default is \p cds::opt::v::relaxed_ordering
204         */
205         template <typename... Options>
206         struct make_traits {
207 #   ifdef CDS_DOXYGEN_INVOKED
208             typedef implementation_defined type ;   ///< Metafunction result
209 #   else
210             typedef typename cds::opt::make_options<
211                 typename cds::opt::find_type_traits< traits, Options... >::type
212                 ,Options...
213             >::type   type;
214 #   endif
215         };
216
217         /// The kernel of flat combining
218         /**
219             Template parameters:
220             - \p PublicationRecord - a type derived from \ref publication_record
221             - \p Traits - a type traits of flat combining, default is \p flat_combining::traits.
222                 \ref make_traits metafunction can be used to create type traits
223
224             The kernel object should be a member of a container class. The container cooperates with flat combining
225             kernel object. There are two ways to interact with the kernel:
226             - One-by-one processing the active records of the publication list. This mode provides by \p combine() function:
227               the container acquires its publication record by \p acquire_record(), fills its fields and calls
228               \p combine() function of its kernel object. If the current thread becomes a combiner, the kernel
229               calls \p fc_apply() function of the container for each active non-empty record. Then, the container
230               should release its publication record by \p release_record(). Only one pass through the publication
231               list is possible.
232             - Batch processing - \p batch_combine() function. It this mode the container obtains access
233               to entire publication list. This mode allows the container to perform an elimination, for example,
234               the stack can collide \p push() and \p pop() requests. The sequence of invocations is the following:
235               the container acquires its publication record by \p acquire_record(), fills its field and call
236               \p batch_combine() function of its kernel object. If the current thread becomes a combiner,
237               the kernel calls \p fc_process() function of the container passing two iterators pointing to
238               the begin and the end of publication list (see \ref iterator class). The iterators allow
239               multiple pass through active records of publication list. For each processed record the container
240               should call \p operation_done() function. On the end, the container should release
241               its record by \p release_record().
242         */
243         template <
244             typename PublicationRecord
245             ,typename Traits = traits
246         >
247         class kernel
248         {
249         public:
250             typedef PublicationRecord   publication_record_type;   ///< publication record type
251             typedef Traits   traits;                               ///< Type traits
252             typedef typename traits::lock_type global_lock_type;   ///< Global lock type
253             typedef typename traits::back_off  back_off;           ///< back-off strategy type
254             typedef typename traits::allocator allocator;          ///< Allocator type (used for allocating publication_record_type data)
255             typedef typename traits::stat      stat;               ///< Internal statistics
256             typedef typename traits::memory_model memory_model;    ///< C++ memory model
257
258         protected:
259             //@cond
260             typedef cds::details::Allocator< publication_record_type, allocator >   cxx11_allocator; ///< internal helper cds::details::Allocator
261             typedef std::lock_guard<global_lock_type> lock_guard;
262             //@endcond
263
264         protected:
265             atomics::atomic<unsigned int>  m_nCount;   ///< Total count of combining passes. Used as an age.
266             publication_record_type *   m_pHead;    ///< Head of publication list
267             boost::thread_specific_ptr< publication_record_type >   m_pThreadRec;   ///< Thread-local publication record
268             mutable global_lock_type    m_Mutex;    ///< Global mutex
269             mutable stat                m_Stat;     ///< Internal statistics
270             unsigned int const          m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
271             unsigned int const          m_nCombinePassCount; ///< Number of combining passes
272
273         public:
274             /// Initializes the object
275             /**
276                 Compact factor = 64
277
278                 Combiner pass count = 8
279             */
280             kernel()
281                 : m_nCount(0)
282                 , m_pHead( nullptr )
283                 , m_pThreadRec( tls_cleanup )
284                 , m_nCompactFactor( 64 - 1 ) // binary mask
285                 , m_nCombinePassCount( 8 )
286             {
287                 init();
288             }
289
290             /// Initializes the object
291             kernel(
292                 unsigned int nCompactFactor  ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
293                 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
294                 )
295                 : m_nCount(0)
296                 , m_pHead( nullptr )
297                 , m_pThreadRec( tls_cleanup )
298                 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 ))   // binary mask
299                 , m_nCombinePassCount( nCombinePassCount )
300             {
301                 init();
302             }
303
304             /// Destroys the objects and mark all publication records as inactive
305             ~kernel()
306             {
307                 // mark all publication record as detached
308                 for ( publication_record * p = m_pHead; p; ) {
309                     p->pOwner = nullptr;
310
311                     publication_record * pRec = p;
312                     p = p->pNext.load( memory_model::memory_order_relaxed );
313                     if ( pRec->nState.load( memory_model::memory_order_relaxed ) == removed )
314                         free_publication_record( static_cast<publication_record_type *>( pRec ));
315                 }
316             }
317
318             /// Gets publication list record for the current thread
319             /**
320                 If there is no publication record for the current thread
321                 the function allocates it.
322             */
323             publication_record_type * acquire_record()
324             {
325                 publication_record_type * pRec = m_pThreadRec.get();
326                 if ( !pRec ) {
327                     // Allocate new publication record
328                     pRec = cxx11_allocator().New();
329                     pRec->pOwner = reinterpret_cast<void *>( this );
330                     m_pThreadRec.reset( pRec );
331                     m_Stat.onCreatePubRecord();
332                 }
333
334                 if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
335                     publish( pRec );
336
337                 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_EmptyRecord );
338
339                 m_Stat.onAcquirePubRecord();
340                 return pRec;
341             }
342
343             /// Marks publication record for the current thread as empty
344             void release_record( publication_record_type * pRec )
345             {
346                 assert( pRec->is_done() );
347                 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
348                 m_Stat.onReleasePubRecord();
349             }
350
351             /// Trying to execute operation \p nOpId
352             /**
353                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
354                 \p owner is a container that is owner of flat combining kernel object.
355                 As a result the current thread can become a combiner or can wait for
356                 another combiner performs \p pRec operation.
357
358                 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
359                 for each active non-empty publication record.
360             */
361             template <class Container>
362             void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
363             {
364                 assert( nOpId >= req_Operation );
365                 assert( pRec );
366                 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
367                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
368
369                 m_Stat.onOperation();
370
371                 try_combining( owner, pRec );
372             }
373
374             /// Trying to execute operation \p nOpId in batch-combine mode
375             /**
376                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
377                 \p owner is a container that owns flat combining kernel object.
378                 As a result the current thread can become a combiner or can wait for
379                 another combiner performs \p pRec operation.
380
381                 If the thread becomes a combiner, the kernel calls \p owner.fc_process
382                 giving the container the full access over publication list. This function
383                 is useful for an elimination technique if the container supports any kind of
384                 that. The container can perform multiple pass through publication list.
385
386                 \p owner.fc_process has two arguments - forward iterators on begin and end of
387                 publication list, see \ref iterator class. For each processed record the container
388                 should call \ref operation_done function to mark the record as processed.
389
390                 On the end of \p %batch_combine the \ref combine function is called
391                 to process rest of publication records.
392             */
393             template <class Container>
394             void batch_combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
395             {
396                 assert( nOpId >= req_Operation );
397                 assert( pRec );
398                 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
399                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
400
401                 m_Stat.onOperation();
402
403                 try_batch_combining( owner, pRec );
404             }
405
406             /// Waits for end of combining
407             void wait_while_combining() const
408             {
409                 lock_guard l( m_Mutex );
410             }
411
412             /// Marks \p rec as executed
413             /**
414                 This function should be called by container if batch_combine mode is used.
415                 For usual combining (see \ref combine) this function is excess.
416             */
417             void operation_done( publication_record& rec )
418             {
419                 rec.nRequest.store( req_Response, memory_model::memory_order_release );
420             }
421
422             /// Internal statistics
423             stat const& statistics() const
424             {
425                 return m_Stat;
426             }
427
428             //@cond
429             // For container classes based on flat combining
430             stat& internal_statistics() const
431             {
432                 return m_Stat;
433             }
434             //@endcond
435
436             /// Returns the compact factor
437             unsigned int compact_factor() const
438             {
439                 return m_nCompactFactor + 1;
440             }
441
442             /// Returns number of combining passes for combiner thread
443             unsigned int combine_pass_count() const
444             {
445                 return m_nCombinePassCount;
446             }
447
448         public:
449             /// Publication list iterator
450             /**
451                 Iterators are intended for batch processing by container's
452                 \p fc_process function.
453                 The iterator allows iterate through active publication list.
454             */
455             class iterator
456             {
457                 //@cond
458                 friend class kernel;
459                 publication_record_type * m_pRec;
460                 //@endcond
461
462             protected:
463                 //@cond
464                 iterator( publication_record_type * pRec )
465                     : m_pRec( pRec )
466                 {
467                     skip_inactive();
468                 }
469
470                 void skip_inactive()
471                 {
472                     while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
473                                     || m_pRec->nRequest.load( memory_model::memory_order_relaxed) < req_Operation ))
474                     {
475                         m_pRec = static_cast<publication_record_type *>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
476                     }
477                 }
478                 //@endcond
479
480             public:
481                 /// Initializes an empty iterator object
482                 iterator()
483                     : m_pRec( nullptr )
484                 {}
485
486                 /// Copy ctor
487                 iterator( iterator const& src )
488                     : m_pRec( src.m_pRec )
489                 {}
490
491                 /// Pre-increment
492                 iterator& operator++()
493                 {
494                     assert( m_pRec );
495                     m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
496                     skip_inactive();
497                     return *this;
498                 }
499
500                 /// Post-increment
501                 iterator operator++(int)
502                 {
503                     assert( m_pRec );
504                     iterator it(*this);
505                     ++(*this);
506                     return it;
507                 }
508
509                 /// Dereference operator, can return \p nullptr
510                 publication_record_type * operator ->()
511                 {
512                     return m_pRec;
513                 }
514
515                 /// Dereference operator, the iterator should not be an end iterator
516                 publication_record_type& operator*()
517                 {
518                     assert( m_pRec );
519                     return *m_pRec;
520                 }
521
522                 /// Iterator equality
523                 friend bool operator==( iterator it1, iterator it2 )
524                 {
525                     return it1.m_pRec == it2.m_pRec;
526                 }
527
528                 /// Iterator inequality
529                 friend bool operator!=( iterator it1, iterator it2 )
530                 {
531                     return !( it1 == it2 );
532                 }
533             };
534
535             /// Returns an iterator to the first active publication record
536             iterator begin()    { return iterator(m_pHead); }
537
538             /// Returns an iterator to the end of publication list. Should not be dereferenced.
539             iterator end()      { return iterator(); }
540
541         private:
542             //@cond
543             static void tls_cleanup( publication_record_type * pRec )
544             {
545                 // Thread done
546                 // pRec that is TLS data should be excluded from publication list
547                 if ( pRec ) {
548                     if ( pRec->pOwner && pRec->nState.load(memory_model::memory_order_relaxed) == active ) {
549                         // record is active and kernel is alive
550                         unsigned int nState = active;
551                         pRec->nState.compare_exchange_strong( nState, removed, memory_model::memory_order_release, atomics::memory_order_relaxed );
552                     }
553                     else {
554                         // record is not in publication list or kernel already deleted
555                         free_publication_record( pRec );
556                     }
557                 }
558             }
559
560             static void free_publication_record( publication_record_type * pRec )
561             {
562                 cxx11_allocator().Delete( pRec );
563             }
564
565             void init()
566             {
567                 assert( m_pThreadRec.get() == nullptr );
568                 publication_record_type * pRec = cxx11_allocator().New();
569                 m_pHead = pRec;
570                 pRec->pOwner = this;
571                 m_pThreadRec.reset( pRec );
572                 m_Stat.onCreatePubRecord();
573             }
574
575             void publish( publication_record_type * pRec )
576             {
577                 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
578
579                 pRec->nAge.store( m_nCount.load(memory_model::memory_order_acquire), memory_model::memory_order_release );
580                 pRec->nState.store( active, memory_model::memory_order_release );
581
582                 // Insert record to publication list
583                 if ( m_pHead != static_cast<publication_record *>(pRec) ) {
584                     publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
585                     if ( p != static_cast<publication_record *>( pRec )) {
586                         do {
587                             pRec->pNext = p;
588                             // Failed CAS changes p
589                         } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
590                             memory_model::memory_order_release, atomics::memory_order_relaxed ));
591                         m_Stat.onActivatPubRecord();
592                     }
593                 }
594             }
595
596             void republish( publication_record_type * pRec )
597             {
598                 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
599                     // The record has been excluded from publication list. Reinsert it
600                     publish( pRec );
601                 }
602             }
603
604             template <class Container>
605             void try_combining( Container& owner, publication_record_type * pRec )
606             {
607                 if ( m_Mutex.try_lock() ) {
608                     // The thread becomes a combiner
609                     lock_guard l( m_Mutex, std::adopt_lock_t() );
610
611                     // The record pRec can be excluded from publication list. Re-publish it
612                     republish( pRec );
613
614                     combining( owner );
615                     assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
616                 }
617                 else {
618                     // There is another combiner, wait while it executes our request
619                     if ( !wait_for_combining( pRec ) ) {
620                         // The thread becomes a combiner
621                         lock_guard l( m_Mutex, std::adopt_lock_t() );
622
623                         // The record pRec can be excluded from publication list. Re-publish it
624                         republish( pRec );
625
626                         combining( owner );
627                         assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
628                     }
629                 }
630             }
631
632             template <class Container>
633             void try_batch_combining( Container& owner, publication_record_type * pRec )
634             {
635                 if ( m_Mutex.try_lock() ) {
636                     // The thread becomes a combiner
637                     lock_guard l( m_Mutex, std::adopt_lock_t() );
638
639                     // The record pRec can be excluded from publication list. Re-publish it
640                     republish( pRec );
641
642                     batch_combining( owner );
643                     assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
644                 }
645                 else {
646                     // There is another combiner, wait while it executes our request
647                     if ( !wait_for_combining( pRec ) ) {
648                         // The thread becomes a combiner
649                         lock_guard l( m_Mutex, std::adopt_lock_t() );
650
651                         // The record pRec can be excluded from publication list. Re-publish it
652                         republish( pRec );
653
654                         batch_combining( owner );
655                         assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
656                     }
657                 }
658             }
659
660             template <class Container>
661             void combining( Container& owner )
662             {
663                 // The thread is a combiner
664                 assert( !m_Mutex.try_lock() );
665
666                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_release ) + 1;
667
668                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
669                     if ( !combining_pass( owner, nCurAge ))
670                         break;
671
672                 m_Stat.onCombining();
673                 if ( (nCurAge & m_nCompactFactor) == 0 )
674                     compact_list( nCurAge );
675             }
676
677             template <class Container>
678             bool combining_pass( Container& owner, unsigned int nCurAge )
679             {
680                 publication_record * pPrev = nullptr;
681                 publication_record * p = m_pHead;
682                 bool bOpDone = false;
683                 while ( p ) {
684                     switch ( p->nState.load( memory_model::memory_order_acquire )) {
685                         case active:
686                             if ( p->op() >= req_Operation ) {
687                                 p->nAge.store( nCurAge, memory_model::memory_order_release );
688                                 owner.fc_apply( static_cast<publication_record_type *>(p) );
689                                 operation_done( *p );
690                                 bOpDone = true;
691                             }
692                             break;
693                         case inactive:
694                             // Only m_pHead can be inactive in the publication list
695                             assert( p == m_pHead );
696                             break;
697                         case removed:
698                             // The record should be removed
699                             p = unlink_and_delete_record( pPrev, p );
700                             continue;
701                         default:
702                             /// ??? That is impossible
703                             assert(false);
704                     }
705                     pPrev = p;
706                     p = p->pNext.load( memory_model::memory_order_acquire );
707                 }
708                 return bOpDone;
709             }
710
711             template <class Container>
712             void batch_combining( Container& owner )
713             {
714                 // The thread is a combiner
715                 assert( !m_Mutex.try_lock() );
716
717                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_release ) + 1;
718
719                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
720                     owner.fc_process( begin(), end() );
721
722                 combining_pass( owner, nCurAge );
723                 m_Stat.onCombining();
724                 if ( (nCurAge & m_nCompactFactor) == 0 )
725                     compact_list( nCurAge );
726             }
727
728             bool wait_for_combining( publication_record_type * pRec )
729             {
730                 back_off bkoff;
731                 while ( pRec->nRequest.load( memory_model::memory_order_acquire ) != req_Response ) {
732
733                     // The record can be excluded from publication list. Reinsert it
734                     republish( pRec );
735
736                     bkoff();
737
738                     if ( m_Mutex.try_lock() ) {
739                         if ( pRec->nRequest.load( memory_model::memory_order_acquire ) == req_Response ) {
740                             m_Mutex.unlock();
741                             break;
742                         }
743                         // The thread becomes a combiner
744                         return false;
745                     }
746                 }
747                 return true;
748             }
749
750             void compact_list( unsigned int const nCurAge )
751             {
752                 // Thinning publication list
753                 publication_record * pPrev = nullptr;
754                 for ( publication_record * p = m_pHead; p; ) {
755                     if ( p->nState.load( memory_model::memory_order_acquire ) == active
756                       && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
757                     {
758                         if ( pPrev ) {
759                             publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
760                             if ( pPrev->pNext.compare_exchange_strong( p, pNext,
761                                 memory_model::memory_order_release, atomics::memory_order_relaxed ))
762                             {
763                                 p->nState.store( inactive, memory_model::memory_order_release );
764                                 p = pNext;
765                                 m_Stat.onDeactivatePubRecord();
766                                 continue;
767                             }
768                         }
769                     }
770                     pPrev = p;
771                     p = p->pNext.load( memory_model::memory_order_acquire );
772                 }
773
774                 m_Stat.onCompactPublicationList();
775             }
776
777             publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
778             {
779                 if ( pPrev ) {
780                     publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
781                     if ( pPrev->pNext.compare_exchange_strong( p, pNext,
782                         memory_model::memory_order_release, atomics::memory_order_relaxed ))
783                     {
784                         free_publication_record( static_cast<publication_record_type *>( p ));
785                         m_Stat.onDeletePubRecord();
786                     }
787                     return pNext;
788                 }
789                 else {
790                     m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
791                     free_publication_record( static_cast<publication_record_type *>( p ));
792                     m_Stat.onDeletePubRecord();
793                     return m_pHead;
794                 }
795             }
796             //@endcond
797         };
798
799         //@cond
800         class container
801         {
802         public:
803             template <typename PubRecord>
804             void fc_apply( PubRecord * )
805             {
806                 assert( false );
807             }
808
809             template <typename Iterator>
810             void fc_process( Iterator, Iterator )
811             {
812                 assert( false );
813             }
814         };
815         //@endcond
816
817     } // namespace flat_combining
818 }} // namespace cds::algo
819
820 #endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_H