d5324c8cbac072560611c3a848c46016f1fe6e38
[libcds.git] / cds / algo / flat_combining.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #ifndef CDSLIB_ALGO_FLAT_COMBINING_H
32 #define CDSLIB_ALGO_FLAT_COMBINING_H
33
34 #include <mutex>
35 #include <cds/algo/atomic.h>
36 #include <cds/details/allocator.h>
37 #include <cds/algo/backoff_strategy.h>
38 #include <cds/sync/spinlock.h>
39 #include <cds/opt/options.h>
40 #include <cds/algo/int_algo.h>
41 #include <boost/thread/tss.hpp>     // thread_specific_ptr
42
43 namespace cds { namespace algo {
44
45     /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
46     /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
47
48     /// Flat combining
49     /**
50         @anchor cds_flat_combining_description
51         Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
52         [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
53         The technique converts a sequential data structure to its concurrent implementation.
54         A few structures are added to the sequential implementation: a <i>global lock</i>,
55         a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
56         of a <i>publication list</i>. The publication list is a list of thread-local records
57         of a size proportional to the number of threads that are concurrently accessing the shared object.
58
59         Each thread \p t accessing the structure to perform an invocation of some method \p m
60         on the shared object executes the following sequence of steps:
61         <ol>
62         <li>Write the invocation opcode and parameters (if any) of the method \p m to be applied
63         sequentially to the shared object in the <i>request</i> field of your thread local publication
64         record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
65         be used to receive the response. If your thread local publication record is marked as active
66         continue to step 2, otherwise continue to step 5.</li>
67         <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
68         field waiting for a response to the invocation (one can add a yield at this point to allow other threads
69         on the same core to run). Once in a while while spinning check if the lock is still taken and that your
70         record is active. If your record is inactive proceed to step 5. Once the response is available,
71         reset the request field to null and return the response.</li>
72         <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
73         return to spinning in step 2.</li>
74         <li>Otherwise, you hold the lock and are a combiner.
75         <ul>
76             <li>Increment the combining pass count by one.</li>
77             <li>Execute a \p fc_apply() by traversing the publication list from the head,
78             combining all nonnull method call invocations, setting the <i>age</i> of each of these records
79             to the current <i>count</i>, applying the combined method calls to the structure D, and returning
80             responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
81             <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
82             list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
83             by the head in the list), remove from the publication list all records whose <i>age</i> is
84             much smaller than the current <i>count</i>. This is done by removing the node and marking it
85             as inactive.</li>
86             <li>Release the lock.</li>
87         </ul>
88         <li>If you have no thread local publication record allocate one, marked as active. If you already
89         have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
90         the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
91         </ol>
92
93         As the test results show, the flat combining technique is suitable for non-intrusive containers
94         like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
95         less impressive results.
96
97         \ref cds_flat_combining_container "List of FC-based containers" in libcds.
98
99         \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
100     */
101     namespace flat_combining {
102
103         /// Special values of publication_record::nRequest
104         enum request_value
105         {
106             req_EmptyRecord,    ///< Publication record is empty
107             req_Response,       ///< Operation is done
108
109             req_Operation       ///< First operation id for derived classes
110         };
111
112         /// publication_record state
113         enum record_state {
114             inactive,       ///< Record is inactive
115             active,         ///< Record is active
116             removed         ///< Record should be removed
117         };
118
119         /// Record of publication list
120         /**
121             Each data structure based on flat combining contains a class derived from \p %publication_record
122         */
123         struct publication_record {
124             atomics::atomic<unsigned int>    nRequest;   ///< Request field (depends on data structure)
125             atomics::atomic<unsigned int>    nState;     ///< Record state: inactive, active, removed
126             atomics::atomic<unsigned int>    nAge;       ///< Age of the record
127             atomics::atomic<publication_record *> pNext; ///< Next record in publication list
128             void *                              pOwner;    ///< [internal data] Pointer to \ref kernel object that manages the publication list
129
130             /// Initializes publication record
131             publication_record()
132                 : nRequest( req_EmptyRecord )
133                 , nState( inactive )
134                 , nAge(0)
135                 , pNext( nullptr )
136                 , pOwner( nullptr )
137             {}
138
139             /// Returns the value of \p nRequest field
140             unsigned int op() const
141             {
142                 return nRequest.load( atomics::memory_order_relaxed );
143             }
144
145             /// Checks if the operation is done
146             bool is_done() const
147             {
148                 return nRequest.load( atomics::memory_order_relaxed ) == req_Response;
149             }
150         };
151
152         /// Flat combining internal statistics
153         template <typename Counter = cds::atomicity::event_counter >
154         struct stat
155         {
156             typedef Counter counter_type;   ///< Event counter type
157
158             counter_type    m_nOperationCount   ;   ///< How many operations have been performed
159             counter_type    m_nCombiningCount   ;   ///< Combining call count
160             counter_type    m_nCompactPublicationList; ///< Count of publication list compacting
161             counter_type    m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
162             counter_type    m_nActivatePubRecord;   ///< Count of publication record activating
163             counter_type    m_nPubRecordCreated ;   ///< Count of created publication records
164             counter_type    m_nPubRecordDeteted ;   ///< Count of deleted publication records
165             counter_type    m_nAcquirePubRecCount;  ///< Count of acquiring publication record
166             counter_type    m_nReleasePubRecCount;  ///< Count on releasing publication record
167
168             /// Returns current combining factor
169             /**
170                 Combining factor is how many operations perform in one combine pass:
171                 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
172             */
173             double combining_factor() const
174             {
175                 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
176             }
177
178             //@cond
179             void    onOperation()               { ++m_nOperationCount; }
180             void    onCombining()               { ++m_nCombiningCount; }
181             void    onCompactPublicationList()  { ++m_nCompactPublicationList; }
182             void    onDeactivatePubRecord()     { ++m_nDeactivatePubRecord; }
183             void    onActivatPubRecord()        { ++m_nActivatePubRecord; }
184             void    onCreatePubRecord()         { ++m_nPubRecordCreated; }
185             void    onDeletePubRecord()         { ++m_nPubRecordDeteted; }
186             void    onAcquirePubRecord()        { ++m_nAcquirePubRecCount; }
187             void    onReleasePubRecord()        { ++m_nReleasePubRecCount; }
188             //@endcond
189         };
190
191         /// Flat combining dummy internal statistics
192         struct empty_stat
193         {
194             //@cond
195             void    onOperation()               {}
196             void    onCombining()               {}
197             void    onCompactPublicationList()  {}
198             void    onDeactivatePubRecord()     {}
199             void    onActivatPubRecord()        {}
200             void    onCreatePubRecord()         {}
201             void    onDeletePubRecord()         {}
202             void    onAcquirePubRecord()        {}
203             void    onReleasePubRecord()        {}
204             //@endcond
205         };
206
207         /// Type traits of \ref kernel class
208         /**
209             You can define different type traits for \ref kernel
210             by specifying your struct based on \p %traits
211             or by using \ref make_traits metafunction.
212         */
213         struct traits
214         {
215             typedef cds::sync::spin             lock_type;  ///< Lock type
216             typedef cds::backoff::delay_of<2>   back_off;   ///< Back-off strategy
217             typedef CDS_DEFAULT_ALLOCATOR       allocator;  ///< Allocator used for TLS data (allocating publication_record derivatives)
218             typedef empty_stat                  stat;       ///< Internal statistics
219             typedef opt::v::relaxed_ordering  memory_model; ///< /// C++ memory ordering model
220         };
221
222         /// Metafunction converting option list to traits
223         /**
224             \p Options are:
225             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
226             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
227             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
228             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
229             - \p opt::memory_model - C++ memory ordering model.
230                 List of all available memory ordering see \p opt::memory_model.
231                 Default is \p cds::opt::v::relaxed_ordering
232         */
233         template <typename... Options>
234         struct make_traits {
235 #   ifdef CDS_DOXYGEN_INVOKED
236             typedef implementation_defined type ;   ///< Metafunction result
237 #   else
238             typedef typename cds::opt::make_options<
239                 typename cds::opt::find_type_traits< traits, Options... >::type
240                 ,Options...
241             >::type   type;
242 #   endif
243         };
244
245         /// The kernel of flat combining
246         /**
247             Template parameters:
248             - \p PublicationRecord - a type derived from \ref publication_record
249             - \p Traits - a type traits of flat combining, default is \p flat_combining::traits.
250                 \ref make_traits metafunction can be used to create type traits
251
252             The kernel object should be a member of a container class. The container cooperates with flat combining
253             kernel object. There are two ways to interact with the kernel:
254             - One-by-one processing the active records of the publication list. This mode provides by \p combine() function:
255               the container acquires its publication record by \p acquire_record(), fills its fields and calls
256               \p combine() function of its kernel object. If the current thread becomes a combiner, the kernel
257               calls \p fc_apply() function of the container for each active non-empty record. Then, the container
258               should release its publication record by \p release_record(). Only one pass through the publication
259               list is possible.
260             - Batch processing - \p batch_combine() function. It this mode the container obtains access
261               to entire publication list. This mode allows the container to perform an elimination, for example,
262               the stack can collide \p push() and \p pop() requests. The sequence of invocations is the following:
263               the container acquires its publication record by \p acquire_record(), fills its field and call
264               \p batch_combine() function of its kernel object. If the current thread becomes a combiner,
265               the kernel calls \p fc_process() function of the container passing two iterators pointing to
266               the begin and the end of publication list (see \ref iterator class). The iterators allow
267               multiple pass through active records of publication list. For each processed record the container
268               should call \p operation_done() function. On the end, the container should release
269               its record by \p release_record().
270         */
271         template <
272             typename PublicationRecord
273             ,typename Traits = traits
274         >
275         class kernel
276         {
277         public:
278             typedef PublicationRecord   publication_record_type;   ///< publication record type
279             typedef Traits   traits;                               ///< Type traits
280             typedef typename traits::lock_type global_lock_type;   ///< Global lock type
281             typedef typename traits::back_off  back_off;           ///< back-off strategy type
282             typedef typename traits::allocator allocator;          ///< Allocator type (used for allocating publication_record_type data)
283             typedef typename traits::stat      stat;               ///< Internal statistics
284             typedef typename traits::memory_model memory_model;    ///< C++ memory model
285
286         protected:
287             //@cond
288             typedef cds::details::Allocator< publication_record_type, allocator >   cxx11_allocator; ///< internal helper cds::details::Allocator
289             typedef std::lock_guard<global_lock_type> lock_guard;
290             //@endcond
291
292         protected:
293             atomics::atomic<unsigned int>  m_nCount;   ///< Total count of combining passes. Used as an age.
294             publication_record_type *   m_pHead;    ///< Head of publication list
295             boost::thread_specific_ptr< publication_record_type >   m_pThreadRec;   ///< Thread-local publication record
296             mutable global_lock_type    m_Mutex;    ///< Global mutex
297             mutable stat                m_Stat;     ///< Internal statistics
298             unsigned int const          m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
299             unsigned int const          m_nCombinePassCount; ///< Number of combining passes
300
301         public:
302             /// Initializes the object
303             /**
304                 Compact factor = 64
305
306                 Combiner pass count = 8
307             */
308             kernel()
309                 : m_nCount(0)
310                 , m_pHead( nullptr )
311                 , m_pThreadRec( tls_cleanup )
312                 , m_nCompactFactor( 64 - 1 ) // binary mask
313                 , m_nCombinePassCount( 8 )
314             {
315                 init();
316             }
317
318             /// Initializes the object
319             kernel(
320                 unsigned int nCompactFactor  ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
321                 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
322                 )
323                 : m_nCount(0)
324                 , m_pHead( nullptr )
325                 , m_pThreadRec( tls_cleanup )
326                 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 ))   // binary mask
327                 , m_nCombinePassCount( nCombinePassCount )
328             {
329                 init();
330             }
331
332             /// Destroys the objects and mark all publication records as inactive
333             ~kernel()
334             {
335                 // mark all publication record as detached
336                 for ( publication_record * p = m_pHead; p; ) {
337                     p->pOwner = nullptr;
338
339                     publication_record * pRec = p;
340                     p = p->pNext.load( memory_model::memory_order_relaxed );
341                     if ( pRec->nState.load( memory_model::memory_order_relaxed ) == removed )
342                         free_publication_record( static_cast<publication_record_type *>( pRec ));
343                 }
344             }
345
346             /// Gets publication list record for the current thread
347             /**
348                 If there is no publication record for the current thread
349                 the function allocates it.
350             */
351             publication_record_type * acquire_record()
352             {
353                 publication_record_type * pRec = m_pThreadRec.get();
354                 if ( !pRec ) {
355                     // Allocate new publication record
356                     pRec = cxx11_allocator().New();
357                     pRec->pOwner = reinterpret_cast<void *>( this );
358                     m_pThreadRec.reset( pRec );
359                     m_Stat.onCreatePubRecord();
360                 }
361
362                 if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
363                     publish( pRec );
364
365                 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_EmptyRecord );
366
367                 m_Stat.onAcquirePubRecord();
368                 return pRec;
369             }
370
371             /// Marks publication record for the current thread as empty
372             void release_record( publication_record_type * pRec )
373             {
374                 assert( pRec->is_done() );
375                 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
376                 m_Stat.onReleasePubRecord();
377             }
378
379             /// Trying to execute operation \p nOpId
380             /**
381                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
382                 \p owner is a container that is owner of flat combining kernel object.
383                 As a result the current thread can become a combiner or can wait for
384                 another combiner performs \p pRec operation.
385
386                 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
387                 for each active non-empty publication record.
388             */
389             template <class Container>
390             void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
391             {
392                 assert( nOpId >= req_Operation );
393                 assert( pRec );
394                 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
395                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
396
397                 m_Stat.onOperation();
398
399                 try_combining( owner, pRec );
400             }
401
402             /// Trying to execute operation \p nOpId in batch-combine mode
403             /**
404                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
405                 \p owner is a container that owns flat combining kernel object.
406                 As a result the current thread can become a combiner or can wait for
407                 another combiner performs \p pRec operation.
408
409                 If the thread becomes a combiner, the kernel calls \p owner.fc_process
410                 giving the container the full access over publication list. This function
411                 is useful for an elimination technique if the container supports any kind of
412                 that. The container can perform multiple pass through publication list.
413
414                 \p owner.fc_process has two arguments - forward iterators on begin and end of
415                 publication list, see \ref iterator class. For each processed record the container
416                 should call \ref operation_done function to mark the record as processed.
417
418                 On the end of \p %batch_combine the \ref combine function is called
419                 to process rest of publication records.
420             */
421             template <class Container>
422             void batch_combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
423             {
424                 assert( nOpId >= req_Operation );
425                 assert( pRec );
426                 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
427                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
428
429                 m_Stat.onOperation();
430
431                 try_batch_combining( owner, pRec );
432             }
433
434             /// Waits for end of combining
435             void wait_while_combining() const
436             {
437                 lock_guard l( m_Mutex );
438             }
439
440             /// Marks \p rec as executed
441             /**
442                 This function should be called by container if batch_combine mode is used.
443                 For usual combining (see \ref combine) this function is excess.
444             */
445             void operation_done( publication_record& rec )
446             {
447                 rec.nRequest.store( req_Response, memory_model::memory_order_release );
448             }
449
450             /// Internal statistics
451             stat const& statistics() const
452             {
453                 return m_Stat;
454             }
455
456             //@cond
457             // For container classes based on flat combining
458             stat& internal_statistics() const
459             {
460                 return m_Stat;
461             }
462             //@endcond
463
464             /// Returns the compact factor
465             unsigned int compact_factor() const
466             {
467                 return m_nCompactFactor + 1;
468             }
469
470             /// Returns number of combining passes for combiner thread
471             unsigned int combine_pass_count() const
472             {
473                 return m_nCombinePassCount;
474             }
475
476         public:
477             /// Publication list iterator
478             /**
479                 Iterators are intended for batch processing by container's
480                 \p fc_process function.
481                 The iterator allows iterate through active publication list.
482             */
483             class iterator
484             {
485                 //@cond
486                 friend class kernel;
487                 publication_record_type * m_pRec;
488                 //@endcond
489
490             protected:
491                 //@cond
492                 iterator( publication_record_type * pRec )
493                     : m_pRec( pRec )
494                 {
495                     skip_inactive();
496                 }
497
498                 void skip_inactive()
499                 {
500                     while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
501                                     || m_pRec->nRequest.load( memory_model::memory_order_relaxed) < req_Operation ))
502                     {
503                         m_pRec = static_cast<publication_record_type *>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
504                     }
505                 }
506                 //@endcond
507
508             public:
509                 /// Initializes an empty iterator object
510                 iterator()
511                     : m_pRec( nullptr )
512                 {}
513
514                 /// Copy ctor
515                 iterator( iterator const& src )
516                     : m_pRec( src.m_pRec )
517                 {}
518
519                 /// Pre-increment
520                 iterator& operator++()
521                 {
522                     assert( m_pRec );
523                     m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
524                     skip_inactive();
525                     return *this;
526                 }
527
528                 /// Post-increment
529                 iterator operator++(int)
530                 {
531                     assert( m_pRec );
532                     iterator it(*this);
533                     ++(*this);
534                     return it;
535                 }
536
537                 /// Dereference operator, can return \p nullptr
538                 publication_record_type * operator ->()
539                 {
540                     return m_pRec;
541                 }
542
543                 /// Dereference operator, the iterator should not be an end iterator
544                 publication_record_type& operator*()
545                 {
546                     assert( m_pRec );
547                     return *m_pRec;
548                 }
549
550                 /// Iterator equality
551                 friend bool operator==( iterator it1, iterator it2 )
552                 {
553                     return it1.m_pRec == it2.m_pRec;
554                 }
555
556                 /// Iterator inequality
557                 friend bool operator!=( iterator it1, iterator it2 )
558                 {
559                     return !( it1 == it2 );
560                 }
561             };
562
563             /// Returns an iterator to the first active publication record
564             iterator begin()    { return iterator(m_pHead); }
565
566             /// Returns an iterator to the end of publication list. Should not be dereferenced.
567             iterator end()      { return iterator(); }
568
569         private:
570             //@cond
571             static void tls_cleanup( publication_record_type * pRec )
572             {
573                 // Thread done
574                 // pRec that is TLS data should be excluded from publication list
575                 if ( pRec ) {
576                     if ( pRec->pOwner && pRec->nState.load(memory_model::memory_order_relaxed) == active ) {
577                         // record is active and kernel is alive
578                         unsigned int nState = active;
579                         pRec->nState.compare_exchange_strong( nState, removed, memory_model::memory_order_release, atomics::memory_order_relaxed );
580                     }
581                     else {
582                         // record is not in publication list or kernel already deleted
583                         free_publication_record( pRec );
584                     }
585                 }
586             }
587
588             static void free_publication_record( publication_record_type * pRec )
589             {
590                 cxx11_allocator().Delete( pRec );
591             }
592
593             void init()
594             {
595                 assert( m_pThreadRec.get() == nullptr );
596                 publication_record_type * pRec = cxx11_allocator().New();
597                 m_pHead = pRec;
598                 pRec->pOwner = this;
599                 m_pThreadRec.reset( pRec );
600                 m_Stat.onCreatePubRecord();
601             }
602
603             void publish( publication_record_type * pRec )
604             {
605                 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
606
607                 pRec->nAge.store( m_nCount.load(memory_model::memory_order_acquire), memory_model::memory_order_release );
608                 pRec->nState.store( active, memory_model::memory_order_release );
609
610                 // Insert record to publication list
611                 if ( m_pHead != static_cast<publication_record *>(pRec) ) {
612                     publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
613                     if ( p != static_cast<publication_record *>( pRec )) {
614                         do {
615                             pRec->pNext = p;
616                             // Failed CAS changes p
617                         } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
618                             memory_model::memory_order_release, atomics::memory_order_relaxed ));
619                         m_Stat.onActivatPubRecord();
620                     }
621                 }
622             }
623
624             void republish( publication_record_type * pRec )
625             {
626                 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
627                     // The record has been excluded from publication list. Reinsert it
628                     publish( pRec );
629                 }
630             }
631
632             template <class Container>
633             void try_combining( Container& owner, publication_record_type * pRec )
634             {
635                 if ( m_Mutex.try_lock() ) {
636                     // The thread becomes a combiner
637                     lock_guard l( m_Mutex, std::adopt_lock_t() );
638
639                     // The record pRec can be excluded from publication list. Re-publish it
640                     republish( pRec );
641
642                     combining( owner );
643                     assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
644                 }
645                 else {
646                     // There is another combiner, wait while it executes our request
647                     if ( !wait_for_combining( pRec ) ) {
648                         // The thread becomes a combiner
649                         lock_guard l( m_Mutex, std::adopt_lock_t() );
650
651                         // The record pRec can be excluded from publication list. Re-publish it
652                         republish( pRec );
653
654                         combining( owner );
655                         assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
656                     }
657                 }
658             }
659
660             template <class Container>
661             void try_batch_combining( Container& owner, publication_record_type * pRec )
662             {
663                 if ( m_Mutex.try_lock() ) {
664                     // The thread becomes a combiner
665                     lock_guard l( m_Mutex, std::adopt_lock_t() );
666
667                     // The record pRec can be excluded from publication list. Re-publish it
668                     republish( pRec );
669
670                     batch_combining( owner );
671                     assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
672                 }
673                 else {
674                     // There is another combiner, wait while it executes our request
675                     if ( !wait_for_combining( pRec ) ) {
676                         // The thread becomes a combiner
677                         lock_guard l( m_Mutex, std::adopt_lock_t() );
678
679                         // The record pRec can be excluded from publication list. Re-publish it
680                         republish( pRec );
681
682                         batch_combining( owner );
683                         assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
684                     }
685                 }
686             }
687
688             template <class Container>
689             void combining( Container& owner )
690             {
691                 // The thread is a combiner
692                 assert( !m_Mutex.try_lock() );
693
694                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_release ) + 1;
695
696                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
697                     if ( !combining_pass( owner, nCurAge ))
698                         break;
699
700                 m_Stat.onCombining();
701                 if ( (nCurAge & m_nCompactFactor) == 0 )
702                     compact_list( nCurAge );
703             }
704
705             template <class Container>
706             bool combining_pass( Container& owner, unsigned int nCurAge )
707             {
708                 publication_record * pPrev = nullptr;
709                 publication_record * p = m_pHead;
710                 bool bOpDone = false;
711                 while ( p ) {
712                     switch ( p->nState.load( memory_model::memory_order_acquire )) {
713                         case active:
714                             if ( p->op() >= req_Operation ) {
715                                 p->nAge.store( nCurAge, memory_model::memory_order_release );
716                                 owner.fc_apply( static_cast<publication_record_type *>(p) );
717                                 operation_done( *p );
718                                 bOpDone = true;
719                             }
720                             break;
721                         case inactive:
722                             // Only m_pHead can be inactive in the publication list
723                             assert( p == m_pHead );
724                             break;
725                         case removed:
726                             // The record should be removed
727                             p = unlink_and_delete_record( pPrev, p );
728                             continue;
729                         default:
730                             /// ??? That is impossible
731                             assert(false);
732                     }
733                     pPrev = p;
734                     p = p->pNext.load( memory_model::memory_order_acquire );
735                 }
736                 return bOpDone;
737             }
738
739             template <class Container>
740             void batch_combining( Container& owner )
741             {
742                 // The thread is a combiner
743                 assert( !m_Mutex.try_lock() );
744
745                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_release ) + 1;
746
747                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
748                     owner.fc_process( begin(), end() );
749
750                 combining_pass( owner, nCurAge );
751                 m_Stat.onCombining();
752                 if ( (nCurAge & m_nCompactFactor) == 0 )
753                     compact_list( nCurAge );
754             }
755
756             bool wait_for_combining( publication_record_type * pRec )
757             {
758                 back_off bkoff;
759                 while ( pRec->nRequest.load( memory_model::memory_order_acquire ) != req_Response ) {
760
761                     // The record can be excluded from publication list. Reinsert it
762                     republish( pRec );
763
764                     bkoff();
765
766                     if ( m_Mutex.try_lock() ) {
767                         if ( pRec->nRequest.load( memory_model::memory_order_acquire ) == req_Response ) {
768                             m_Mutex.unlock();
769                             break;
770                         }
771                         // The thread becomes a combiner
772                         return false;
773                     }
774                 }
775                 return true;
776             }
777
778             void compact_list( unsigned int const nCurAge )
779             {
780                 // Thinning publication list
781                 publication_record * pPrev = nullptr;
782                 for ( publication_record * p = m_pHead; p; ) {
783                     if ( p->nState.load( memory_model::memory_order_acquire ) == active
784                       && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
785                     {
786                         if ( pPrev ) {
787                             publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
788                             if ( pPrev->pNext.compare_exchange_strong( p, pNext,
789                                 memory_model::memory_order_release, atomics::memory_order_relaxed ))
790                             {
791                                 p->nState.store( inactive, memory_model::memory_order_release );
792                                 p = pNext;
793                                 m_Stat.onDeactivatePubRecord();
794                                 continue;
795                             }
796                         }
797                     }
798                     pPrev = p;
799                     p = p->pNext.load( memory_model::memory_order_acquire );
800                 }
801
802                 m_Stat.onCompactPublicationList();
803             }
804
805             publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
806             {
807                 if ( pPrev ) {
808                     publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
809                     if ( pPrev->pNext.compare_exchange_strong( p, pNext,
810                         memory_model::memory_order_release, atomics::memory_order_relaxed ))
811                     {
812                         free_publication_record( static_cast<publication_record_type *>( p ));
813                         m_Stat.onDeletePubRecord();
814                     }
815                     return pNext;
816                 }
817                 else {
818                     m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
819                     free_publication_record( static_cast<publication_record_type *>( p ));
820                     m_Stat.onDeletePubRecord();
821                     return m_pHead;
822                 }
823             }
824             //@endcond
825         };
826
827         //@cond
828         class container
829         {
830         public:
831             template <typename PubRecord>
832             void fc_apply( PubRecord * )
833             {
834                 assert( false );
835             }
836
837             template <typename Iterator>
838             void fc_process( Iterator, Iterator )
839             {
840                 assert( false );
841             }
842         };
843         //@endcond
844
845     } // namespace flat_combining
846 }} // namespace cds::algo
847
848 #endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_H