486fe7247bb8c870e7c5d25571d02aeec977e871
[libcds.git] / cds / algo / flat_combining.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_ALGO_FLAT_COMBINING_H
4 #define __CDS_ALGO_FLAT_COMBINING_H
5
6 #include <mutex>
7 #include <cds/cxx11_atomic.h>
8 #include <cds/details/allocator.h>
9 #include <cds/algo/backoff_strategy.h>
10 #include <cds/lock/spinlock.h>
11 #include <cds/opt/options.h>
12 #include <cds/algo/int_algo.h>
13 #include <boost/thread/tss.hpp>     // thread_specific_ptr
14
15 namespace cds { namespace algo {
16
17     /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
18     /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
19
20     /// Flat combining
21     /**
22         @anchor cds_flat_combining_description
23         Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
24         [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
25         The technique converts a sequential data structure to its concurrent implementation.
26         A few structures are added to the sequential implementation: a <i>global lock</i>,
27         a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
28         of a <i>publication list</i>. The publication list is a list of thread-local records
29         of a size proportional to the number of threads that are concurrently accessing the shared object.
30
31         Each thread \p t accessing the structure to perform an invocation of some method \p m
32         on the shared object executes the following sequence of steps:
33         <ol>
34         <li>Write the invocation opcode and parameters (if any) of the method \p m to be applied
35         sequentially to the shared object in the <i>request</i> field of your thread local publication
36         record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
37         be used to receive the response. If your thread local publication record is marked as active
38         continue to step 2, otherwise continue to step 5.</li>
39         <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
40         field waiting for a response to the invocation (one can add a yield at this point to allow other threads
41         on the same core to run). Once in a while while spinning check if the lock is still taken and that your
42         record is active. If your record is inactive proceed to step 5. Once the response is available,
43         reset the request field to null and return the response.</li>
44         <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
45         return to spinning in step 2.</li>
46         <li>Otherwise, you hold the lock and are a combiner.
47         <ul>
48             <li>Increment the combining pass count by one.</li>
49             <li>Execute a \p fc_apply() by traversing the publication list from the head,
50             combining all nonnull method call invocations, setting the <i>age</i> of each of these records
51             to the current <i>count</i>, applying the combined method calls to the structure D, and returning
52             responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
53             <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
54             list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
55             by the head in the list), remove from the publication list all records whose <i>age</i> is
56             much smaller than the current <i>count</i>. This is done by removing the node and marking it
57             as inactive.</li>
58             <li>Release the lock.</li>
59         </ul>
60         <li>If you have no thread local publication record allocate one, marked as active. If you already
61         have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
62         the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
63         </ol>
64
65         As the test results show, the flat combining technique is suitable for non-intrusive containers
66         like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
67         less impressive results.
68
69         \ref cds_flat_combining_container "List of FC-based containers" in libcds.
70
71         \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
72     */
73     namespace flat_combining {
74
75         /// Special values of publication_record::nRequest
76         enum request_value
77         {
78             req_EmptyRecord,    ///< Publication record is empty
79             req_Response,       ///< Operation is done
80
81             req_Operation       ///< First operation id for derived classes
82         };
83
84         /// publication_record state
85         enum record_state {
86             inactive,       ///< Record is inactive
87             active,         ///< Record is active
88             removed         ///< Record should be removed
89         };
90
91         /// Record of publication list
92         /**
93             Each data structure based on flat combining contains a class derived from \p %publication_record
94         */
95         struct publication_record {
96             atomics::atomic<unsigned int>    nRequest;   ///< Request field (depends on data structure)
97             atomics::atomic<unsigned int>    nState;     ///< Record state: inactive, active, removed
98             unsigned int                        nAge;       ///< Age of the record
99             atomics::atomic<publication_record *> pNext; ///< Next record in publication list
100             void *                              pOwner;    ///< [internal data] Pointer to \ref kernel object that manages the publication list
101
102             /// Initializes publication record
103             publication_record()
104                 : nRequest( req_EmptyRecord )
105                 , nState( inactive )
106                 , nAge(0)
107                 , pNext( nullptr )
108                 , pOwner( nullptr )
109             {}
110
111             /// Returns the value of \p nRequest field
112             unsigned int op() const
113             {
114                 return nRequest.load( atomics::memory_order_relaxed );
115             }
116
117             /// Checks if the operation is done
118             bool is_done() const
119             {
120                 return nRequest.load( atomics::memory_order_relaxed ) == req_Response;
121             }
122         };
123
124         /// Flat combining internal statistics
125         template <typename Counter = cds::atomicity::event_counter >
126         struct stat
127         {
128             typedef Counter counter_type;   ///< Event counter type
129
130             counter_type    m_nOperationCount   ;   ///< How many operations have been performed
131             counter_type    m_nCombiningCount   ;   ///< Combining call count
132             counter_type    m_nCompactPublicationList; ///< Count of publication list compacting
133             counter_type    m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
134             counter_type    m_nActivatePubRecord;   ///< Count of publication record activating
135             counter_type    m_nPubRecordCreated ;   ///< Count of created publication records
136             counter_type    m_nPubRecordDeteted ;   ///< Count of deleted publication records
137             counter_type    m_nAcquirePubRecCount;  ///< Count of acquiring publication record
138             counter_type    m_nReleasePubRecCount;  ///< Count on releasing publication record
139
140             /// Returns current combining factor
141             /**
142                 Combining factor is how many operations perform in one combine pass:
143                 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
144             */
145             double combining_factor() const
146             {
147                 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
148             }
149
150             //@cond
151             void    onOperation()               { ++m_nOperationCount; }
152             void    onCombining()               { ++m_nCombiningCount; }
153             void    onCompactPublicationList()  { ++m_nCompactPublicationList; }
154             void    onDeactivatePubRecord()     { ++m_nDeactivatePubRecord; }
155             void    onActivatPubRecord()        { ++m_nActivatePubRecord; }
156             void    onCreatePubRecord()         { ++m_nPubRecordCreated; }
157             void    onDeletePubRecord()         { ++m_nPubRecordDeteted; }
158             void    onAcquirePubRecord()        { ++m_nAcquirePubRecCount; }
159             void    onReleasePubRecord()        { ++m_nReleasePubRecCount; }
160             //@endcond
161         };
162
163         /// Flat combining dummy internal statistics
164         struct empty_stat
165         {
166             //@cond
167             void    onOperation()               {}
168             void    onCombining()               {}
169             void    onCompactPublicationList()  {}
170             void    onDeactivatePubRecord()     {}
171             void    onActivatPubRecord()        {}
172             void    onCreatePubRecord()         {}
173             void    onDeletePubRecord()         {}
174             void    onAcquirePubRecord()        {}
175             void    onReleasePubRecord()        {}
176             //@endcond
177         };
178
179         /// Type traits of \ref kernel class
180         /**
181             You can define different type traits for \ref kernel
182             by specifying your struct based on \p %traits
183             or by using \ref make_traits metafunction.
184         */
185         struct traits
186         {
187             typedef cds::lock::Spin             lock_type;  ///< Lock type
188             typedef cds::backoff::delay_of<2>   back_off;   ///< Back-off strategy
189             typedef CDS_DEFAULT_ALLOCATOR       allocator;  ///< Allocator used for TLS data (allocating publication_record derivatives)
190             typedef empty_stat                  stat;       ///< Internal statistics
191             typedef opt::v::relaxed_ordering  memory_model; ///< /// C++ memory ordering model
192         };
193
194         /// Metafunction converting option list to traits
195         /**
196             \p Options are:
197             - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
198             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
199             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
200             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
201             - \p opt::memory_model - C++ memory ordering model.
202                 List of all available memory ordering see opt::memory_model.
203                 Default if cds::opt::v:relaxed_ordering
204         */
205         template <typename... Options>
206         struct make_traits {
207 #   ifdef CDS_DOXYGEN_INVOKED
208             typedef implementation_defined type ;   ///< Metafunction result
209 #   else
210             typedef typename cds::opt::make_options<
211                 typename cds::opt::find_type_traits< traits, Options... >::type
212                 ,Options...
213             >::type   type;
214 #   endif
215         };
216
217         /// The kernel of flat combining
218         /**
219             Template parameters:
220             - \p PublicationRecord - a type derived from \ref publication_record
221             - \p Traits - a type traits of flat combining, default is \p flat_combining::traits.
222                 \ref make_traits metafunction can be used to create type traits
223
224             The kernel object should be a member of a container class. The container cooperates with flat combining
225             kernel object. There are two ways to interact with the kernel:
226             - One-by-one processing the active records of the publication list. This mode provides \ref combine function:
227               the container acquires its publication record by \ref acquire_record, fills its fields and calls
228               \p combine function of its kernel object. If the current thread becomes a combiner, the kernel
229               calls \p fc_apply function of the container for each active non-empty record. Then, the container
230               should release its publication record by \ref release_record. Only one pass through the publication
231               list is possible.
232             - Batch processing (\ref batch_combine function). It this mode the container obtains access
233               to entire publication list. This mode allows the container to perform an elimination, for example,
234               the stack can collide \p push and \p pop requests. The sequence of invocations is the following:
235               the container acquires its publication record by \ref acquire_record, fills its field and call
236               \p batch_combine function of its kernel object. If the current thread becomes a combiner,
237               the kernel calls \p fc_process function of the container passing two iterators pointing to
238               begin and end of publication list (see \ref iterator class). The iterators allows
239               multiple pass through active records of publication list. For each processed record the container
240               should call \ref operation_done function. On the end, the container should release
241               its record by \ref release_record.
242         */
243         template <
244             typename PublicationRecord
245             ,typename Traits = traits
246         >
247         class kernel
248         {
249         public:
250             typedef PublicationRecord   publication_record_type;   ///< publication record type
251             typedef Traits   traits;                               ///< Type traits
252             typedef typename traits::lock_type global_lock_type;   ///< Global lock type
253             typedef typename traits::back_off  back_off;           ///< back-off strategy type
254             typedef typename traits::allocator allocator;          ///< Allocator type (used for allocating publication_record_type data)
255             typedef typename traits::stat      stat;               ///< Internal statistics
256             typedef typename traits::memory_model memory_model;    ///< C++ memory model
257
258         protected:
259             //@cond
260             typedef cds::details::Allocator< publication_record_type, allocator >   cxx11_allocator; ///< internal helper cds::details::Allocator
261             typedef std::lock_guard<global_lock_type> lock_guard;
262             //@endcond
263
264         protected:
265             unsigned int                m_nCount;   ///< Count of combining passes
266             publication_record_type *   m_pHead;    ///< Head of publication list
267             boost::thread_specific_ptr< publication_record_type >   m_pThreadRec;   ///< Thread-local publication record
268             mutable global_lock_type    m_Mutex;    ///< Global mutex
269             mutable stat                m_Stat;     ///< Internal statistics
270             unsigned int const          m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
271             unsigned int const          m_nCombinePassCount; ///< Number of combining passes
272
273         public:
274             /// Initializes the object
275             /**
276                 Compact factor = 64
277
278                 Combiner pass count = 8
279             */
280             kernel()
281                 : m_nCount(0)
282                 , m_pHead( nullptr )
283                 , m_pThreadRec( tls_cleanup )
284                 , m_nCompactFactor( 64 - 1 ) // binary mask
285                 , m_nCombinePassCount( 8 )
286             {
287                 init();
288             }
289
290             /// Initializes the object
291             kernel(
292                 unsigned int nCompactFactor  ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
293                 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
294                 )
295                 : m_nCount(0)
296                 , m_pHead( nullptr )
297                 , m_pThreadRec( tls_cleanup )
298                 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 ))   // binary mask
299                 , m_nCombinePassCount( nCombinePassCount )
300             {
301                 init();
302             }
303
304             /// Destroys the objects and mark all publication records as inactive
305             ~kernel()
306             {
307                 // mark all publication record as detached
308                 for ( publication_record * p = m_pHead; p; p = p->pNext.load( memory_model::memory_order_relaxed ))
309                     p->pOwner = nullptr;
310             }
311
312             /// Gets publication list record for the current thread
313             /**
314                 If there is no publication record for the current thread
315                 the function allocates it.
316             */
317             publication_record_type * acquire_record()
318             {
319                 publication_record_type * pRec = m_pThreadRec.get();
320                 if ( !pRec ) {
321                     // Allocate new publication record
322                     pRec = cxx11_allocator().New();
323                     pRec->pOwner = reinterpret_cast<void *>( this );
324                     m_pThreadRec.reset( pRec );
325                     m_Stat.onCreatePubRecord();
326                 }
327
328                 if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
329                     publish( pRec );
330
331                 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_EmptyRecord );
332
333                 m_Stat.onAcquirePubRecord();
334                 return pRec;
335             }
336
337             /// Marks publication record for the current thread as empty
338             void release_record( publication_record_type * pRec )
339             {
340                 assert( pRec->is_done() );
341                 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_relaxed );
342                 m_Stat.onReleasePubRecord();
343             }
344
345             /// Trying to execute operation \p nOpId
346             /**
347                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
348                 \p owner is a container that is owner of flat combining kernel object.
349                 As a result the current thread can become a combiner or can wait for
350                 another combiner performs \p pRec operation.
351
352                 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
353                 for each active non-empty publication record.
354             */
355             template <class Container>
356             void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
357             {
358                 assert( nOpId >= req_Operation );
359                 assert( pRec );
360                 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
361                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
362
363                 m_Stat.onOperation();
364
365                 try_combining( owner, pRec );
366             }
367
368             /// Trying to execute operation \p nOpId in batch-combine mode
369             /**
370                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
371                 \p owner is a container that owns flat combining kernel object.
372                 As a result the current thread can become a combiner or can wait for
373                 another combiner performs \p pRec operation.
374
375                 If the thread becomes a combiner, the kernel calls \p owner.fc_process
376                 giving the container the full access over publication list. This function
377                 is useful for an elimination technique if the container supports any kind of
378                 that. The container can perform multiple pass through publication list.
379
380                 \p owner.fc_process has two arguments - forward iterators on begin and end of
381                 publication list, see \ref iterator class. For each processed record the container
382                 should call \ref operation_done function to mark the record as processed.
383
384                 On the end of \p %batch_combine the \ref combine function is called
385                 to process rest of publication records.
386             */
387             template <class Container>
388             void batch_combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
389             {
390                 assert( nOpId >= req_Operation );
391                 assert( pRec );
392                 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
393                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
394
395                 m_Stat.onOperation();
396
397                 try_batch_combining( owner, pRec );
398             }
399
400             /// Waits for end of combining
401             void wait_while_combining() const
402             {
403                 lock_guard l( m_Mutex );
404             }
405
406             /// Marks \p rec as executed
407             /**
408                 This function should be called by container if batch_combine mode is used.
409                 For usual combining (see \ref combine) this function is excess.
410             */
411             void operation_done( publication_record& rec )
412             {
413                 rec.nRequest.store( req_Response, memory_model::memory_order_release );
414             }
415
416             /// Internal statistics
417             stat const& statistics() const
418             {
419                 return m_Stat;
420             }
421
422             //@cond
423             // For container classes based on flat combining
424             stat& internal_statistics() const
425             {
426                 return m_Stat;
427             }
428             //@endcond
429
430             /// Returns the compact factor
431             unsigned int compact_factor() const
432             {
433                 return m_nCompactFactor + 1;
434             }
435
436             /// Returns number of combining passes for combiner thread
437             unsigned int combine_pass_count() const
438             {
439                 return m_nCombinePassCount;
440             }
441
442         public:
443             /// Publication list iterator
444             /**
445                 Iterators are intended for batch processing by container's
446                 \p fc_process function.
447                 The iterator allows iterate through active publication list.
448             */
449             class iterator
450             {
451                 //@cond
452                 friend class kernel;
453                 publication_record_type * m_pRec;
454                 //@endcond
455
456             protected:
457                 //@cond
458                 iterator( publication_record_type * pRec )
459                     : m_pRec( pRec )
460                 {
461                     skip_inactive();
462                 }
463
464                 void skip_inactive()
465                 {
466                     while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
467                                     || m_pRec->nRequest.load( memory_model::memory_order_relaxed) < req_Operation ))
468                     {
469                         m_pRec = static_cast<publication_record_type *>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
470                     }
471                 }
472                 //@endcond
473
474             public:
475                 /// Initializes an empty iterator object
476                 iterator()
477                     : m_pRec( nullptr )
478                 {}
479
480                 /// Copy ctor
481                 iterator( iterator const& src )
482                     : m_pRec( src.m_pRec )
483                 {}
484
485                 /// Pre-increment
486                 iterator& operator++()
487                 {
488                     assert( m_pRec );
489                     m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
490                     skip_inactive();
491                     return *this;
492                 }
493
494                 /// Post-increment
495                 iterator operator++(int)
496                 {
497                     assert( m_pRec );
498                     iterator it(*this);
499                     ++(*this);
500                     return it;
501                 }
502
503                 /// Dereference operator, can return \p nullptr
504                 publication_record_type * operator ->()
505                 {
506                     return m_pRec;
507                 }
508
509                 /// Dereference operator, the iterator should not be an end iterator
510                 publication_record_type& operator*()
511                 {
512                     assert( m_pRec );
513                     return *m_pRec;
514                 }
515
516                 /// Iterator equality
517                 friend bool operator==( iterator it1, iterator it2 )
518                 {
519                     return it1.m_pRec == it2.m_pRec;
520                 }
521
522                 /// Iterator inequality
523                 friend bool operator!=( iterator it1, iterator it2 )
524                 {
525                     return !( it1 == it2 );
526                 }
527             };
528
529             /// Returns an iterator to the first active publication record
530             iterator begin()    { return iterator(m_pHead); }
531
532             /// Returns an iterator to the end of publication list. Should not be dereferenced.
533             iterator end()      { return iterator(); }
534
535         private:
536             //@cond
537             static void tls_cleanup( publication_record_type * pRec )
538             {
539                 // Thread done
540                 // pRec that is TLS data should be excluded from publication list
541                 if ( pRec ) {
542                     if ( pRec->nState.load(memory_model::memory_order_relaxed) == active && pRec->pOwner ) {
543                         // record is active and kernel is alive
544                         unsigned int nState = active;
545                         pRec->nState.compare_exchange_strong( nState, removed, memory_model::memory_order_release, atomics::memory_order_relaxed );
546                     }
547                     else {
548                         // record is not in publication list or kernel already deleted
549                         cxx11_allocator().Delete( pRec );
550                     }
551                 }
552             }
553
554             void init()
555             {
556                 assert( m_pThreadRec.get() == nullptr );
557                 publication_record_type * pRec = cxx11_allocator().New();
558                 m_pHead = pRec;
559                 pRec->pOwner = this;
560                 m_pThreadRec.reset( pRec );
561                 m_Stat.onCreatePubRecord();
562             }
563
564             void publish( publication_record_type * pRec )
565             {
566                 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
567
568                 pRec->nAge = m_nCount;
569                 pRec->nState.store( active, memory_model::memory_order_release );
570
571                 // Insert record to publication list
572                 if ( m_pHead != static_cast<publication_record *>(pRec) ) {
573                     publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
574                     if ( p != static_cast<publication_record *>( pRec )) {
575                         do {
576                             pRec->pNext = p;
577                             // Failed CAS changes p
578                         } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
579                             memory_model::memory_order_release, atomics::memory_order_relaxed ));
580                         m_Stat.onActivatPubRecord();
581                     }
582                 }
583             }
584
585             void republish( publication_record_type * pRec )
586             {
587                 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
588                     // The record has been excluded from publication list. Reinsert it
589                     publish( pRec );
590                 }
591             }
592
593             template <class Container>
594             void try_combining( Container& owner, publication_record_type * pRec )
595             {
596                 if ( m_Mutex.try_lock() ) {
597                     // The thread becomes a combiner
598                     lock_guard l( m_Mutex, std::adopt_lock_t() );
599
600                     // The record pRec can be excluded from publication list. Re-publish it
601                     republish( pRec );
602
603                     combining( owner );
604                     assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
605                 }
606                 else {
607                     // There is another combiner, wait while it executes our request
608                     if ( !wait_for_combining( pRec ) ) {
609                         // The thread becomes a combiner
610                         lock_guard l( m_Mutex, std::adopt_lock_t() );
611
612                         // The record pRec can be excluded from publication list. Re-publish it
613                         republish( pRec );
614
615                         combining( owner );
616                         assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
617                     }
618                 }
619             }
620
621             template <class Container>
622             void try_batch_combining( Container& owner, publication_record_type * pRec )
623             {
624                 if ( m_Mutex.try_lock() ) {
625                     // The thread becomes a combiner
626                     lock_guard l( m_Mutex, std::adopt_lock_t() );
627
628                     // The record pRec can be excluded from publication list. Re-publish it
629                     republish( pRec );
630
631                     batch_combining( owner );
632                     assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
633                 }
634                 else {
635                     // There is another combiner, wait while it executes our request
636                     if ( !wait_for_combining( pRec ) ) {
637                         // The thread becomes a combiner
638                         lock_guard l( m_Mutex, std::adopt_lock_t() );
639
640                         // The record pRec can be excluded from publication list. Re-publish it
641                         republish( pRec );
642
643                         batch_combining( owner );
644                         assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
645                     }
646                 }
647             }
648
649             template <class Container>
650             void combining( Container& owner )
651             {
652                 // The thread is a combiner
653                 assert( !m_Mutex.try_lock() );
654
655                 unsigned int const nCurAge = ++m_nCount;
656
657                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
658                     if ( !combining_pass( owner, nCurAge ))
659                         break;
660
661                 m_Stat.onCombining();
662                 if ( (nCurAge & m_nCompactFactor) == 0 )
663                     compact_list( nCurAge );
664             }
665
666             template <class Container>
667             bool combining_pass( Container& owner, unsigned int nCurAge )
668             {
669                 publication_record * pPrev = nullptr;
670                 publication_record * p = m_pHead;
671                 bool bOpDone = false;
672                 while ( p ) {
673                     switch ( p->nState.load( memory_model::memory_order_acquire )) {
674                         case active:
675                             if ( p->op() >= req_Operation ) {
676                                 p->nAge = nCurAge;
677                                 owner.fc_apply( static_cast<publication_record_type *>(p) );
678                                 operation_done( *p );
679                                 bOpDone = true;
680                             }
681                             break;
682                         case inactive:
683                             // Only m_pHead can be inactive in the publication list
684                             assert( p == m_pHead );
685                             break;
686                         case removed:
687                             // The record should be removed
688                             p = unlink_and_delete_record( pPrev, p );
689                             continue;
690                         default:
691                             /// ??? That is impossible
692                             assert(false);
693                     }
694                     pPrev = p;
695                     p = p->pNext.load( memory_model::memory_order_acquire );
696                 }
697                 return bOpDone;
698             }
699
700             template <class Container>
701             void batch_combining( Container& owner )
702             {
703                 // The thread is a combiner
704                 assert( !m_Mutex.try_lock() );
705
706                 unsigned int const nCurAge = ++m_nCount;
707
708                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
709                     owner.fc_process( begin(), end() );
710
711                 combining_pass( owner, nCurAge );
712                 m_Stat.onCombining();
713                 if ( (nCurAge & m_nCompactFactor) == 0 )
714                     compact_list( nCurAge );
715             }
716
717             bool wait_for_combining( publication_record_type * pRec )
718             {
719                 back_off bkoff;
720                 while ( pRec->nRequest.load( memory_model::memory_order_acquire ) != req_Response ) {
721
722                     // The record can be excluded from publication list. Reinsert it
723                     republish( pRec );
724
725                     bkoff();
726
727                     if ( m_Mutex.try_lock() ) {
728                         if ( pRec->nRequest.load( memory_model::memory_order_acquire ) == req_Response ) {
729                             m_Mutex.unlock();
730                             break;
731                         }
732                         // The thread becomes a combiner
733                         return false;
734                     }
735                 }
736                 return true;
737             }
738
739             void compact_list( unsigned int const nCurAge )
740             {
741                 // Thinning publication list
742                 publication_record * pPrev = nullptr;
743                 for ( publication_record * p = m_pHead; p; ) {
744                     if ( p->nState.load( memory_model::memory_order_acquire ) == active && p->nAge + m_nCompactFactor < nCurAge ) {
745                         if ( pPrev ) {
746                             publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
747                             if ( pPrev->pNext.compare_exchange_strong( p, pNext,
748                                 memory_model::memory_order_release, atomics::memory_order_relaxed ))
749                             {
750                                 p->nState.store( inactive, memory_model::memory_order_release );
751                                 p = pNext;
752                                 m_Stat.onDeactivatePubRecord();
753                                 continue;
754                             }
755                         }
756                     }
757                     pPrev = p;
758                     p = p->pNext.load( memory_model::memory_order_acquire );
759                 }
760
761                 m_Stat.onCompactPublicationList();
762             }
763
764             publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
765             {
766                 if ( pPrev ) {
767                     publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
768                     if ( pPrev->pNext.compare_exchange_strong( p, pNext,
769                         memory_model::memory_order_release, atomics::memory_order_relaxed ))
770                     {
771                         cxx11_allocator().Delete( static_cast<publication_record_type *>( p ));
772                         m_Stat.onDeletePubRecord();
773                     }
774                     return pNext;
775                 }
776                 else {
777                     m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
778                     cxx11_allocator().Delete( static_cast<publication_record_type *>( p ));
779                     m_Stat.onDeletePubRecord();
780                     return m_pHead;
781                 }
782             }
783             //@endcond
784         };
785
786         //@cond
787         class container
788         {
789         public:
790             template <typename PubRecord>
791             void fc_apply( PubRecord * )
792             {
793                 assert( false );
794             }
795
796             template <typename Iterator>
797             void fc_process( Iterator, Iterator )
798             {
799                 assert( false );
800             }
801         };
802         //@endcond
803
804     } // namespace flat_combining
805 }} // namespace cds::algo
806
807 #endif // #ifndef __CDS_ALGO_FLAT_COMBINING_H