Merge branch 'dev'
[libcds.git] / cds / algo / flat_combining.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8     
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.     
29 */
30
31 #ifndef CDSLIB_ALGO_FLAT_COMBINING_H
32 #define CDSLIB_ALGO_FLAT_COMBINING_H
33
34 #include <mutex>
35 #include <cds/algo/atomic.h>
36 #include <cds/details/allocator.h>
37 #include <cds/algo/backoff_strategy.h>
38 #include <cds/sync/spinlock.h>
39 #include <cds/opt/options.h>
40 #include <cds/algo/int_algo.h>
41 #include <boost/thread/tss.hpp>     // thread_specific_ptr
42
43 namespace cds { namespace algo {
44
45     /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
46     /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
47
48     /// Flat combining
49     /**
50         @anchor cds_flat_combining_description
51         Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
52         [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
53         The technique converts a sequential data structure to its concurrent implementation.
54         A few structures are added to the sequential implementation: a <i>global lock</i>,
55         a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
56         of a <i>publication list</i>. The publication list is a list of thread-local records
57         of a size proportional to the number of threads that are concurrently accessing the shared object.
58
59         Each thread \p t accessing the structure to perform an invocation of some method \p m
60         on the shared object executes the following sequence of steps:
61         <ol>
62         <li>Write the invocation opcode and parameters (if any) of the method \p m to be applied
63         sequentially to the shared object in the <i>request</i> field of your thread local publication
64         record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
65         be used to receive the response. If your thread local publication record is marked as active
66         continue to step 2, otherwise continue to step 5.</li>
67         <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
68         field waiting for a response to the invocation (one can add a yield at this point to allow other threads
69         on the same core to run). Once in a while while spinning check if the lock is still taken and that your
70         record is active. If your record is inactive proceed to step 5. Once the response is available,
71         reset the request field to null and return the response.</li>
72         <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
73         return to spinning in step 2.</li>
74         <li>Otherwise, you hold the lock and are a combiner.
75         <ul>
76             <li>Increment the combining pass count by one.</li>
77             <li>Execute a \p fc_apply() by traversing the publication list from the head,
78             combining all nonnull method call invocations, setting the <i>age</i> of each of these records
79             to the current <i>count</i>, applying the combined method calls to the structure D, and returning
80             responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
81             <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
82             list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
83             by the head in the list), remove from the publication list all records whose <i>age</i> is
84             much smaller than the current <i>count</i>. This is done by removing the node and marking it
85             as inactive.</li>
86             <li>Release the lock.</li>
87         </ul>
88         <li>If you have no thread local publication record allocate one, marked as active. If you already
89         have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
90         the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
91         </ol>
92
93         As the test results show, the flat combining technique is suitable for non-intrusive containers
94         like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
95         less impressive results.
96
97         \ref cds_flat_combining_container "List of FC-based containers" in libcds.
98
99         \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
100     */
101     namespace flat_combining {
102
103         /// Special values of publication_record::nRequest
104         enum request_value
105         {
106             req_EmptyRecord,    ///< Publication record is empty
107             req_Response,       ///< Operation is done
108
109             req_Operation       ///< First operation id for derived classes
110         };
111
112         /// publication_record state
113         enum record_state {
114             inactive,       ///< Record is inactive
115             active,         ///< Record is active
116             removed         ///< Record should be removed
117         };
118
119         /// Record of publication list
120         /**
121             Each data structure based on flat combining contains a class derived from \p %publication_record
122         */
123         struct publication_record {
124             atomics::atomic<unsigned int>    nRequest;   ///< Request field (depends on data structure)
125             atomics::atomic<unsigned int>    nState;     ///< Record state: inactive, active, removed
126             atomics::atomic<unsigned int>    nAge;       ///< Age of the record
127             atomics::atomic<publication_record *> pNext; ///< Next record in publication list
128             void *                              pOwner;    ///< [internal data] Pointer to \ref kernel object that manages the publication list
129
130             /// Initializes publication record
131             publication_record()
132                 : nRequest( req_EmptyRecord )
133                 , nState( inactive )
134                 , nAge(0)
135                 , pNext( nullptr )
136                 , pOwner( nullptr )
137             {}
138
139             /// Returns the value of \p nRequest field
140             unsigned int op() const
141             {
142                 return nRequest.load( atomics::memory_order_relaxed );
143             }
144
145             /// Checks if the operation is done
146             bool is_done() const
147             {
148                 return nRequest.load( atomics::memory_order_relaxed ) == req_Response;
149             }
150         };
151
152         /// Flat combining internal statistics
153         template <typename Counter = cds::atomicity::event_counter >
154         struct stat
155         {
156             typedef Counter counter_type;   ///< Event counter type
157
158             counter_type    m_nOperationCount   ;   ///< How many operations have been performed
159             counter_type    m_nCombiningCount   ;   ///< Combining call count
160             counter_type    m_nCompactPublicationList; ///< Count of publication list compacting
161             counter_type    m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
162             counter_type    m_nActivatePubRecord;   ///< Count of publication record activating
163             counter_type    m_nPubRecordCreated ;   ///< Count of created publication records
164             counter_type    m_nPubRecordDeteted ;   ///< Count of deleted publication records
165             counter_type    m_nAcquirePubRecCount;  ///< Count of acquiring publication record
166             counter_type    m_nReleasePubRecCount;  ///< Count on releasing publication record
167
168             /// Returns current combining factor
169             /**
170                 Combining factor is how many operations perform in one combine pass:
171                 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
172             */
173             double combining_factor() const
174             {
175                 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
176             }
177
178             //@cond
179             void    onOperation()               { ++m_nOperationCount; }
180             void    onCombining()               { ++m_nCombiningCount; }
181             void    onCompactPublicationList()  { ++m_nCompactPublicationList; }
182             void    onDeactivatePubRecord()     { ++m_nDeactivatePubRecord; }
183             void    onActivatePubRecord()       { ++m_nActivatePubRecord; }
184             void    onCreatePubRecord()         { ++m_nPubRecordCreated; }
185             void    onDeletePubRecord()         { ++m_nPubRecordDeteted; }
186             void    onAcquirePubRecord()        { ++m_nAcquirePubRecCount; }
187             void    onReleasePubRecord()        { ++m_nReleasePubRecCount; }
188             //@endcond
189         };
190
191         /// Flat combining dummy internal statistics
192         struct empty_stat
193         {
194             //@cond
195             void    onOperation()               {}
196             void    onCombining()               {}
197             void    onCompactPublicationList()  {}
198             void    onDeactivatePubRecord()     {}
199             void    onActivatePubRecord()       {}
200             void    onCreatePubRecord()         {}
201             void    onDeletePubRecord()         {}
202             void    onAcquirePubRecord()        {}
203             void    onReleasePubRecord()        {}
204             //@endcond
205         };
206
207         /// Type traits of \ref kernel class
208         /**
209             You can define different type traits for \ref kernel
210             by specifying your struct based on \p %traits
211             or by using \ref make_traits metafunction.
212         */
213         struct traits
214         {
215             typedef cds::sync::spin             lock_type;  ///< Lock type
216             typedef cds::backoff::delay_of<2>   back_off;   ///< Back-off strategy
217             typedef CDS_DEFAULT_ALLOCATOR       allocator;  ///< Allocator used for TLS data (allocating publication_record derivatives)
218             typedef empty_stat                  stat;       ///< Internal statistics
219             typedef opt::v::relaxed_ordering  memory_model; ///< /// C++ memory ordering model
220         };
221
222         /// Metafunction converting option list to traits
223         /**
224             \p Options are:
225             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
226             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
227             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
228             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
229             - \p opt::memory_model - C++ memory ordering model.
230                 List of all available memory ordering see \p opt::memory_model.
231                 Default is \p cds::opt::v::relaxed_ordering
232         */
233         template <typename... Options>
234         struct make_traits {
235 #   ifdef CDS_DOXYGEN_INVOKED
236             typedef implementation_defined type ;   ///< Metafunction result
237 #   else
238             typedef typename cds::opt::make_options<
239                 typename cds::opt::find_type_traits< traits, Options... >::type
240                 ,Options...
241             >::type   type;
242 #   endif
243         };
244
245         /// The kernel of flat combining
246         /**
247             Template parameters:
248             - \p PublicationRecord - a type derived from \ref publication_record
249             - \p Traits - a type traits of flat combining, default is \p flat_combining::traits.
250                 \ref make_traits metafunction can be used to create type traits
251
252             The kernel object should be a member of a container class. The container cooperates with flat combining
253             kernel object. There are two ways to interact with the kernel:
254             - One-by-one processing the active records of the publication list. This mode provides by \p combine() function:
255               the container acquires its publication record by \p acquire_record(), fills its fields and calls
256               \p combine() function of its kernel object. If the current thread becomes a combiner, the kernel
257               calls \p fc_apply() function of the container for each active non-empty record. Then, the container
258               should release its publication record by \p release_record(). Only one pass through the publication
259               list is possible.
260             - Batch processing - \p batch_combine() function. It this mode the container obtains access
261               to entire publication list. This mode allows the container to perform an elimination, for example,
262               the stack can collide \p push() and \p pop() requests. The sequence of invocations is the following:
263               the container acquires its publication record by \p acquire_record(), fills its field and call
264               \p batch_combine() function of its kernel object. If the current thread becomes a combiner,
265               the kernel calls \p fc_process() function of the container passing two iterators pointing to
266               the begin and the end of publication list (see \ref iterator class). The iterators allow
267               multiple pass through active records of publication list. For each processed record the container
268               should call \p operation_done() function. On the end, the container should release
269               its record by \p release_record().
270         */
271         template <
272             typename PublicationRecord
273             ,typename Traits = traits
274         >
275         class kernel
276         {
277         public:
278             typedef PublicationRecord   publication_record_type;   ///< publication record type
279             typedef Traits   traits;                               ///< Type traits
280             typedef typename traits::lock_type global_lock_type;   ///< Global lock type
281             typedef typename traits::back_off  back_off;           ///< back-off strategy type
282             typedef typename traits::allocator allocator;          ///< Allocator type (used for allocating publication_record_type data)
283             typedef typename traits::stat      stat;               ///< Internal statistics
284             typedef typename traits::memory_model memory_model;    ///< C++ memory model
285
286         protected:
287             //@cond
288             typedef cds::details::Allocator< publication_record_type, allocator >   cxx11_allocator; ///< internal helper cds::details::Allocator
289             typedef std::lock_guard<global_lock_type> lock_guard;
290             //@endcond
291
292         protected:
293             atomics::atomic<unsigned int>  m_nCount;   ///< Total count of combining passes. Used as an age.
294             publication_record_type *   m_pHead;    ///< Head of publication list
295             boost::thread_specific_ptr< publication_record_type >   m_pThreadRec;   ///< Thread-local publication record
296             mutable global_lock_type    m_Mutex;    ///< Global mutex
297             mutable stat                m_Stat;     ///< Internal statistics
298             unsigned int const          m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
299             unsigned int const          m_nCombinePassCount; ///< Number of combining passes
300
301         public:
302             /// Initializes the object
303             /**
304                 Compact factor = 64
305
306                 Combiner pass count = 8
307             */
308             kernel()
309                 : m_nCount(0)
310                 , m_pHead( nullptr )
311                 , m_pThreadRec( tls_cleanup )
312                 , m_nCompactFactor( 64 - 1 ) // binary mask
313                 , m_nCombinePassCount( 8 )
314             {
315                 init();
316             }
317
318             /// Initializes the object
319             kernel(
320                 unsigned int nCompactFactor  ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
321                 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
322                 )
323                 : m_nCount(0)
324                 , m_pHead( nullptr )
325                 , m_pThreadRec( tls_cleanup )
326                 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 ))   // binary mask
327                 , m_nCombinePassCount( nCombinePassCount )
328             {
329                 init();
330             }
331
332             /// Destroys the objects and mark all publication records as inactive
333             ~kernel()
334             {
335                 // mark all publication record as detached
336                 for ( publication_record * p = m_pHead; p; ) {
337                     p->pOwner = nullptr;
338
339                     publication_record * pRec = p;
340                     p = p->pNext.load( memory_model::memory_order_relaxed );
341                     if ( pRec->nState.load( memory_model::memory_order_acquire ) == removed )
342                         free_publication_record( static_cast<publication_record_type *>( pRec ));
343                 }
344             }
345
346             /// Gets publication list record for the current thread
347             /**
348                 If there is no publication record for the current thread
349                 the function allocates it.
350             */
351             publication_record_type * acquire_record()
352             {
353                 publication_record_type * pRec = m_pThreadRec.get();
354                 if ( !pRec ) {
355                     // Allocate new publication record
356                     pRec = cxx11_allocator().New();
357                     pRec->pOwner = reinterpret_cast<void *>( this );
358                     m_pThreadRec.reset( pRec );
359                     m_Stat.onCreatePubRecord();
360                 }
361
362                 if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
363                     publish( pRec );
364
365                 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_EmptyRecord );
366
367                 m_Stat.onAcquirePubRecord();
368                 return pRec;
369             }
370
371             /// Marks publication record for the current thread as empty
372             void release_record( publication_record_type * pRec )
373             {
374                 assert( pRec->is_done() );
375                 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
376                 m_Stat.onReleasePubRecord();
377             }
378
379             /// Trying to execute operation \p nOpId
380             /**
381                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
382                 \p owner is a container that is owner of flat combining kernel object.
383                 As a result the current thread can become a combiner or can wait for
384                 another combiner performs \p pRec operation.
385
386                 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
387                 for each active non-empty publication record.
388             */
389             template <class Container>
390             void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
391             {
392                 assert( nOpId >= req_Operation );
393                 assert( pRec );
394                 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
395                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
396
397                 m_Stat.onOperation();
398
399                 try_combining( owner, pRec );
400             }
401
402             /// Trying to execute operation \p nOpId in batch-combine mode
403             /**
404                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
405                 \p owner is a container that owns flat combining kernel object.
406                 As a result the current thread can become a combiner or can wait for
407                 another combiner performs \p pRec operation.
408
409                 If the thread becomes a combiner, the kernel calls \p owner.fc_process
410                 giving the container the full access over publication list. This function
411                 is useful for an elimination technique if the container supports any kind of
412                 that. The container can perform multiple pass through publication list.
413
414                 \p owner.fc_process has two arguments - forward iterators on begin and end of
415                 publication list, see \ref iterator class. For each processed record the container
416                 should call \ref operation_done function to mark the record as processed.
417
418                 On the end of \p %batch_combine the \ref combine function is called
419                 to process rest of publication records.
420             */
421             template <class Container>
422             void batch_combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
423             {
424                 assert( nOpId >= req_Operation );
425                 assert( pRec );
426                 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
427                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
428
429                 m_Stat.onOperation();
430
431                 try_batch_combining( owner, pRec );
432             }
433
434             /// Waits for end of combining
435             void wait_while_combining() const
436             {
437                 lock_guard l( m_Mutex );
438             }
439
440             /// Marks \p rec as executed
441             /**
442                 This function should be called by container if batch_combine mode is used.
443                 For usual combining (see \ref combine) this function is excess.
444             */
445             void operation_done( publication_record& rec )
446             {
447                 rec.nRequest.store( req_Response, memory_model::memory_order_release );
448             }
449
450             /// Internal statistics
451             stat const& statistics() const
452             {
453                 return m_Stat;
454             }
455
456             //@cond
457             // For container classes based on flat combining
458             stat& internal_statistics() const
459             {
460                 return m_Stat;
461             }
462             //@endcond
463
464             /// Returns the compact factor
465             unsigned int compact_factor() const
466             {
467                 return m_nCompactFactor + 1;
468             }
469
470             /// Returns number of combining passes for combiner thread
471             unsigned int combine_pass_count() const
472             {
473                 return m_nCombinePassCount;
474             }
475
476         public:
477             /// Publication list iterator
478             /**
479                 Iterators are intended for batch processing by container's
480                 \p fc_process function.
481                 The iterator allows iterate through active publication list.
482             */
483             class iterator
484             {
485                 //@cond
486                 friend class kernel;
487                 publication_record_type * m_pRec;
488                 //@endcond
489
490             protected:
491                 //@cond
492                 iterator( publication_record_type * pRec )
493                     : m_pRec( pRec )
494                 {
495                     skip_inactive();
496                 }
497
498                 void skip_inactive()
499                 {
500                     while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
501                                     || m_pRec->nRequest.load( memory_model::memory_order_relaxed) < req_Operation ))
502                     {
503                         m_pRec = static_cast<publication_record_type *>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
504                     }
505                 }
506                 //@endcond
507
508             public:
509                 /// Initializes an empty iterator object
510                 iterator()
511                     : m_pRec( nullptr )
512                 {}
513
514                 /// Copy ctor
515                 iterator( iterator const& src )
516                     : m_pRec( src.m_pRec )
517                 {}
518
519                 /// Pre-increment
520                 iterator& operator++()
521                 {
522                     assert( m_pRec );
523                     m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
524                     skip_inactive();
525                     return *this;
526                 }
527
528                 /// Post-increment
529                 iterator operator++(int)
530                 {
531                     assert( m_pRec );
532                     iterator it(*this);
533                     ++(*this);
534                     return it;
535                 }
536
537                 /// Dereference operator, can return \p nullptr
538                 publication_record_type * operator ->()
539                 {
540                     return m_pRec;
541                 }
542
543                 /// Dereference operator, the iterator should not be an end iterator
544                 publication_record_type& operator*()
545                 {
546                     assert( m_pRec );
547                     return *m_pRec;
548                 }
549
550                 /// Iterator equality
551                 friend bool operator==( iterator it1, iterator it2 )
552                 {
553                     return it1.m_pRec == it2.m_pRec;
554                 }
555
556                 /// Iterator inequality
557                 friend bool operator!=( iterator it1, iterator it2 )
558                 {
559                     return !( it1 == it2 );
560                 }
561             };
562
563             /// Returns an iterator to the first active publication record
564             iterator begin()    { return iterator(m_pHead); }
565
566             /// Returns an iterator to the end of publication list. Should not be dereferenced.
567             iterator end()      { return iterator(); }
568
569         private:
570             //@cond
571             static void tls_cleanup( publication_record_type * pRec )
572             {
573                 // Thread done
574                 // pRec that is TLS data should be excluded from publication list
575                 if ( pRec ) {
576                     if ( pRec->pOwner ) {
577                         // kernel is alive
578                         pRec->nState.store( removed, memory_model::memory_order_release );
579                     }
580                     else {
581                         // kernel already deleted
582                         free_publication_record( pRec );
583                     }
584                 }
585             }
586
587             static void free_publication_record( publication_record_type * pRec )
588             {
589                 cxx11_allocator().Delete( pRec );
590             }
591
592             void init()
593             {
594                 assert( m_pThreadRec.get() == nullptr );
595                 publication_record_type * pRec = cxx11_allocator().New();
596                 m_pHead = pRec;
597                 pRec->pOwner = this;
598                 m_pThreadRec.reset( pRec );
599                 m_Stat.onCreatePubRecord();
600             }
601
602             void publish( publication_record_type * pRec )
603             {
604                 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
605
606                 pRec->nAge.store( m_nCount.load(memory_model::memory_order_acquire), memory_model::memory_order_release );
607                 pRec->nState.store( active, memory_model::memory_order_release );
608
609                 // Insert record to publication list
610                 if ( m_pHead != static_cast<publication_record *>(pRec) ) {
611                     publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
612                     if ( p != static_cast<publication_record *>( pRec )) {
613                         do {
614                             pRec->pNext = p;
615                             // Failed CAS changes p
616                         } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
617                             memory_model::memory_order_release, atomics::memory_order_relaxed ));
618                         m_Stat.onActivatePubRecord();
619                     }
620                 }
621             }
622
623             void republish( publication_record_type * pRec )
624             {
625                 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
626                     // The record has been excluded from publication list. Reinsert it
627                     publish( pRec );
628                 }
629             }
630
631             template <class Container>
632             void try_combining( Container& owner, publication_record_type * pRec )
633             {
634                 if ( m_Mutex.try_lock() ) {
635                     // The thread becomes a combiner
636                     lock_guard l( m_Mutex, std::adopt_lock_t() );
637
638                     // The record pRec can be excluded from publication list. Re-publish it
639                     republish( pRec );
640
641                     combining( owner );
642                     assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
643                 }
644                 else {
645                     // There is another combiner, wait while it executes our request
646                     if ( !wait_for_combining( pRec ) ) {
647                         // The thread becomes a combiner
648                         lock_guard l( m_Mutex, std::adopt_lock_t() );
649
650                         // The record pRec can be excluded from publication list. Re-publish it
651                         republish( pRec );
652
653                         combining( owner );
654                         assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
655                     }
656                 }
657             }
658
659             template <class Container>
660             void try_batch_combining( Container& owner, publication_record_type * pRec )
661             {
662                 if ( m_Mutex.try_lock() ) {
663                     // The thread becomes a combiner
664                     lock_guard l( m_Mutex, std::adopt_lock_t() );
665
666                     // The record pRec can be excluded from publication list. Re-publish it
667                     republish( pRec );
668
669                     batch_combining( owner );
670                     assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
671                 }
672                 else {
673                     // There is another combiner, wait while it executes our request
674                     if ( !wait_for_combining( pRec ) ) {
675                         // The thread becomes a combiner
676                         lock_guard l( m_Mutex, std::adopt_lock_t() );
677
678                         // The record pRec can be excluded from publication list. Re-publish it
679                         republish( pRec );
680
681                         batch_combining( owner );
682                         assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
683                     }
684                 }
685             }
686
687             template <class Container>
688             void combining( Container& owner )
689             {
690                 // The thread is a combiner
691                 assert( !m_Mutex.try_lock() );
692
693                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_release ) + 1;
694
695                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
696                     if ( !combining_pass( owner, nCurAge ))
697                         break;
698
699                 m_Stat.onCombining();
700                 if ( (nCurAge & m_nCompactFactor) == 0 )
701                     compact_list( nCurAge );
702             }
703
704             template <class Container>
705             bool combining_pass( Container& owner, unsigned int nCurAge )
706             {
707                 publication_record * pPrev = nullptr;
708                 publication_record * p = m_pHead;
709                 bool bOpDone = false;
710                 while ( p ) {
711                     switch ( p->nState.load( memory_model::memory_order_acquire )) {
712                         case active:
713                             if ( p->op() >= req_Operation ) {
714                                 p->nAge.store( nCurAge, memory_model::memory_order_release );
715                                 owner.fc_apply( static_cast<publication_record_type *>(p) );
716                                 operation_done( *p );
717                                 bOpDone = true;
718                             }
719                             break;
720                         case inactive:
721                             // Only m_pHead can be inactive in the publication list
722                             assert( p == m_pHead );
723                             break;
724                         case removed:
725                             // The record should be removed
726                             p = unlink_and_delete_record( pPrev, p );
727                             continue;
728                         default:
729                             /// ??? That is impossible
730                             assert(false);
731                     }
732                     pPrev = p;
733                     p = p->pNext.load( memory_model::memory_order_acquire );
734                 }
735                 return bOpDone;
736             }
737
738             template <class Container>
739             void batch_combining( Container& owner )
740             {
741                 // The thread is a combiner
742                 assert( !m_Mutex.try_lock() );
743
744                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_release ) + 1;
745
746                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
747                     owner.fc_process( begin(), end() );
748
749                 combining_pass( owner, nCurAge );
750                 m_Stat.onCombining();
751                 if ( (nCurAge & m_nCompactFactor) == 0 )
752                     compact_list( nCurAge );
753             }
754
755             bool wait_for_combining( publication_record_type * pRec )
756             {
757                 back_off bkoff;
758                 while ( pRec->nRequest.load( memory_model::memory_order_acquire ) != req_Response ) {
759
760                     // The record can be excluded from publication list. Reinsert it
761                     republish( pRec );
762
763                     bkoff();
764
765                     if ( m_Mutex.try_lock() ) {
766                         if ( pRec->nRequest.load( memory_model::memory_order_acquire ) == req_Response ) {
767                             m_Mutex.unlock();
768                             break;
769                         }
770                         // The thread becomes a combiner
771                         return false;
772                     }
773                 }
774                 return true;
775             }
776
777             void compact_list( unsigned int const nCurAge )
778             {
779                 // Thinning publication list
780                 publication_record * pPrev = nullptr;
781                 for ( publication_record * p = m_pHead; p; ) {
782                     if ( p->nState.load( memory_model::memory_order_acquire ) == active
783                       && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
784                     {
785                         if ( pPrev ) {
786                             publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
787                             if ( pPrev->pNext.compare_exchange_strong( p, pNext,
788                                 memory_model::memory_order_release, atomics::memory_order_relaxed ))
789                             {
790                                 p->nState.store( inactive, memory_model::memory_order_release );
791                                 p = pNext;
792                                 m_Stat.onDeactivatePubRecord();
793                                 continue;
794                             }
795                         }
796                     }
797                     pPrev = p;
798                     p = p->pNext.load( memory_model::memory_order_acquire );
799                 }
800
801                 m_Stat.onCompactPublicationList();
802             }
803
804             publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
805             {
806                 if ( pPrev ) {
807                     publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
808                     if ( pPrev->pNext.compare_exchange_strong( p, pNext,
809                         memory_model::memory_order_release, atomics::memory_order_relaxed ))
810                     {
811                         free_publication_record( static_cast<publication_record_type *>( p ));
812                         m_Stat.onDeletePubRecord();
813                     }
814                     return pNext;
815                 }
816                 else {
817                     m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
818                     free_publication_record( static_cast<publication_record_type *>( p ));
819                     m_Stat.onDeletePubRecord();
820                     return m_pHead;
821                 }
822             }
823             //@endcond
824         };
825
826         //@cond
827         class container
828         {
829         public:
830             template <typename PubRecord>
831             void fc_apply( PubRecord * )
832             {
833                 assert( false );
834             }
835
836             template <typename Iterator>
837             void fc_process( Iterator, Iterator )
838             {
839                 assert( false );
840             }
841         };
842         //@endcond
843
844     } // namespace flat_combining
845 }} // namespace cds::algo
846
847 #endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_H