a018a4a9855971b4aed5192a0515c9b794a079f9
[libcds.git] / cds / algo / flat_combining.h
1 //$$CDS-header$$
2
3 #ifndef __CDS_ALGO_FLAT_COMBINING_H
4 #define __CDS_ALGO_FLAT_COMBINING_H
5
6 #include <mutex>
7 #include <cds/cxx11_atomic.h>
8 #include <cds/details/allocator.h>
9 #include <cds/algo/backoff_strategy.h>
10 #include <cds/lock/spinlock.h>
11 #include <cds/opt/options.h>
12 #include <cds/algo/int_algo.h>
13 #include <boost/thread/tss.hpp>     // thread_specific_ptr
14
15 namespace cds { namespace algo {
16
17     /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
18     /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
19
20     /// Flat combining
21     /**
22         @anchor cds_flat_combining_description
23         Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
24         [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
25         The technique converts a sequential data structure to its concurrent implementation.
26         A few structures are added to the sequential implementation: a <i>global lock</i>,
27         a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
28         of a <i>publication list</i>. The publication list is a list of thread-local records
29         of a size proportional to the number of threads that are concurrently accessing the shared object.
30
31         Each thread \p t accessing the structure to perform an invocation of some method \p m
32         on the shared object executes the following sequence of steps:
33         <ol>
34         <li>Write the invocation opcode and parameters (if any) of the method \p m to be applied
35         sequentially to the shared object in the <i>request</i> field of your thread local publication
36         record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
37         be used to receive the response. If your thread local publication record is marked as active
38         continue to step 2, otherwise continue to step 5.</li>
39         <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
40         field waiting for a response to the invocation (one can add a yield at this point to allow other threads
41         on the same core to run). Once in a while while spinning check if the lock is still taken and that your
42         record is active. If your record is inactive proceed to step 5. Once the response is available,
43         reset the request field to null and return the response.</li>
44         <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
45         return to spinning in step 2.</li>
46         <li>Otherwise, you hold the lock and are a combiner.
47         <ul>
48             <li>Increment the combining pass count by one.</li>
49             <li>Execute a \p fc_apply() by traversing the publication list from the head,
50             combining all nonnull method call invocations, setting the <i>age</i> of each of these records
51             to the current <i>count</i>, applying the combined method calls to the structure D, and returning
52             responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
53             <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
54             list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
55             by the head in the list), remove from the publication list all records whose <i>age</i> is
56             much smaller than the current <i>count</i>. This is done by removing the node and marking it
57             as inactive.</li>
58             <li>Release the lock.</li>
59         </ul>
60         <li>If you have no thread local publication record allocate one, marked as active. If you already
61         have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
62         the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
63         </ol>
64
65         As the test results show, the flat combining technique is suitable for non-intrusive containers
66         like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
67         less impressive results.
68
69         \ref cds_flat_combining_container "List of FC-based containers" in libcds.
70
71         \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
72     */
73     namespace flat_combining {
74
75         /// Special values of publication_record::nRequest
76         enum request_value
77         {
78             req_EmptyRecord,    ///< Publication record is empty
79             req_Response,       ///< Operation is done
80
81             req_Operation       ///< First operation id for derived classes
82         };
83
84         /// publication_record state
85         enum record_state {
86             inactive,       ///< Record is inactive
87             active,         ///< Record is active
88             removed         ///< Record should be removed
89         };
90
91         /// Record of publication list
92         /**
93             Each data structure based on flat combining contains a class derived from \p %publication_record
94         */
95         struct publication_record {
96             atomics::atomic<unsigned int>    nRequest;   ///< Request field (depends on data structure)
97             atomics::atomic<unsigned int>    nState;     ///< Record state: inactive, active, removed
98             unsigned int                        nAge;       ///< Age of the record
99             atomics::atomic<publication_record *> pNext; ///< Next record in publication list
100             void *                              pOwner;    ///< [internal data] Pointer to \ref kernel object that manages the publication list
101
102             /// Initializes publication record
103             publication_record()
104                 : nRequest( req_EmptyRecord )
105                 , nState( inactive )
106                 , nAge(0)
107                 , pNext( nullptr )
108                 , pOwner( nullptr )
109             {}
110
111             /// Returns the value of \p nRequest field
112             unsigned int op() const
113             {
114                 return nRequest.load( atomics::memory_order_relaxed );
115             }
116
117             /// Checks if the operation is done
118             bool is_done() const
119             {
120                 return nRequest.load( atomics::memory_order_relaxed ) == req_Response;
121             }
122         };
123
124         /// Flat combining internal statistics
125         template <typename Counter = cds::atomicity::event_counter >
126         struct stat
127         {
128             typedef Counter counter_type;   ///< Event counter type
129
130             counter_type    m_nOperationCount   ;   ///< How many operations have been performed
131             counter_type    m_nCombiningCount   ;   ///< Combining call count
132             counter_type    m_nCompactPublicationList; ///< Count of publication list compacting
133             counter_type    m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
134             counter_type    m_nActivatePubRecord;   ///< Count of publication record activating
135             counter_type    m_nPubRecordCreated ;   ///< Count of created publication records
136             counter_type    m_nPubRecordDeteted ;   ///< Count of deleted publication records
137             counter_type    m_nAcquirePubRecCount;  ///< Count of acquiring publication record
138             counter_type    m_nReleasePubRecCount;  ///< Count on releasing publication record
139
140             /// Returns current combining factor
141             /**
142                 Combining factor is how many operations perform in one combine pass:
143                 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
144             */
145             double combining_factor() const
146             {
147                 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
148             }
149
150             //@cond
151             void    onOperation()               { ++m_nOperationCount; }
152             void    onCombining()               { ++m_nCombiningCount; }
153             void    onCompactPublicationList()  { ++m_nCompactPublicationList; }
154             void    onDeactivatePubRecord()     { ++m_nDeactivatePubRecord; }
155             void    onActivatPubRecord()        { ++m_nActivatePubRecord; }
156             void    onCreatePubRecord()         { ++m_nPubRecordCreated; }
157             void    onDeletePubRecord()         { ++m_nPubRecordDeteted; }
158             void    onAcquirePubRecord()        { ++m_nAcquirePubRecCount; }
159             void    onReleasePubRecord()        { ++m_nReleasePubRecCount; }
160             //@endcond
161         };
162
163         /// Flat combining dummy internal statistics
164         struct empty_stat
165         {
166             //@cond
167             void    onOperation()               {}
168             void    onCombining()               {}
169             void    onCompactPublicationList()  {}
170             void    onDeactivatePubRecord()     {}
171             void    onActivatPubRecord()        {}
172             void    onCreatePubRecord()         {}
173             void    onDeletePubRecord()         {}
174             void    onAcquirePubRecord()        {}
175             void    onReleasePubRecord()        {}
176             //@endcond
177         };
178
179         /// Type traits of \ref kernel class
180         /**
181             You can define different type traits for \ref kernel
182             by specifying your struct based on \p %type_traits
183             or by using \ref make_traits metafunction.
184         */
185         struct type_traits
186         {
187             typedef cds::lock::Spin             lock_type;  ///< Lock type
188             typedef cds::backoff::delay_of<2>   back_off;   ///< Back-off strategy
189             typedef CDS_DEFAULT_ALLOCATOR       allocator;  ///< Allocator used for TLS data (allocating publication_record derivatives)
190             typedef empty_stat                  stat;       ///< Internal statistics
191             typedef opt::v::relaxed_ordering  memory_model; ///< /// C++ memory ordering model
192         };
193
194         /// Metafunction converting option list to traits
195         /**
196             This is a wrapper for <tt> cds::opt::make_options< type_traits, Options...> </tt>
197             \p Options are:
198             - \p opt::lock_type - mutex type, default is \p cds::lock::Spin
199             - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
200             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
201             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
202             - \p opt::memory_model - C++ memory ordering model.
203                 List of all available memory ordering see opt::memory_model.
204                 Default if cds::opt::v:relaxed_ordering
205         */
206         template <typename... Options>
207         struct make_traits {
208 #   ifdef CDS_DOXYGEN_INVOKED
209             typedef implementation_defined type ;   ///< Metafunction result
210 #   else
211             typedef typename cds::opt::make_options<
212                 typename cds::opt::find_type_traits< type_traits, Options... >::type
213                 ,Options...
214             >::type   type;
215 #   endif
216         };
217
218         /// The kernel of flat combining
219         /**
220             Template parameters:
221             - \p PublicationRecord - a type derived from \ref publication_record
222             - \p Traits - a type traits of flat combining, default is flat_combining::type_traits.
223                 \ref make_traits metafunction can be used to create type traits
224
225             The kernel object should be a member of a container class. The container cooperates with flat combining
226             kernel object. There are two ways to interact with the kernel:
227             - One-by-one processing the active records of the publication list. This mode provides \ref combine function:
228               the container acquires its publication record by \ref acquire_record, fills its fields and calls
229               \p combine function of its kernel object. If the current thread becomes a combiner, the kernel
230               calls \p fc_apply function of the container for each active non-empty record. Then, the container
231               should release its publication record by \ref release_record. Only one pass through the publication
232               list is possible.
233             - Batch processing (\ref batch_combine function). It this mode the container obtains access
234               to entire publication list. This mode allows the container to perform an elimination, for example,
235               the stack can collide \p push and \p pop requests. The sequence of invocations is the following:
236               the container acquires its publication record by \ref acquire_record, fills its field and call
237               \p batch_combine function of its kernel object. If the current thread becomes a combiner,
238               the kernel calls \p fc_process function of the container passing two iterators pointing to
239               begin and end of publication list (see \ref iterator class). The iterators allows
240               multiple pass through active records of publication list. For each processed record the container
241               should call \ref operation_done function. On the end, the container should release
242               its record by \ref release_record.
243         */
244         template <
245             typename PublicationRecord
246             ,typename Traits = type_traits
247         >
248         class kernel
249         {
250         public:
251             typedef PublicationRecord   publication_record_type;        ///< publication record type
252             typedef Traits              type_traits;                    ///< Type traits
253             typedef typename type_traits::lock_type global_lock_type;   ///< Global lock type
254             typedef typename type_traits::back_off  back_off;           ///< back-off strategy type
255             typedef typename type_traits::allocator allocator;          ///< Allocator type (used for allocating publication_record_type data)
256             typedef typename type_traits::stat      stat;               ///< Internal statistics
257             typedef typename type_traits::memory_model memory_model;    ///< C++ memory model
258
259         protected:
260             //@cond
261             typedef cds::details::Allocator< publication_record_type, allocator >   cxx11_allocator; ///< internal helper cds::details::Allocator
262             typedef std::lock_guard<global_lock_type> lock_guard;
263             //@endcond
264
265         protected:
266             unsigned int                m_nCount;   ///< Count of combining passes
267             publication_record_type *   m_pHead;    ///< Head of publication list
268             boost::thread_specific_ptr< publication_record_type >   m_pThreadRec;   ///< Thread-local publication record
269             mutable global_lock_type    m_Mutex;    ///< Global mutex
270             mutable stat                m_Stat;     ///< Internal statistics
271             unsigned int const          m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
272             unsigned int const          m_nCombinePassCount; ///< Number of combining passes
273
274         public:
275             /// Initializes the object
276             /**
277                 Compact factor = 64
278
279                 Combiner pass count = 8
280             */
281             kernel()
282                 : m_nCount(0)
283                 , m_pHead( nullptr )
284                 , m_pThreadRec( tls_cleanup )
285                 , m_nCompactFactor( 64 - 1 ) // binary mask
286                 , m_nCombinePassCount( 8 )
287             {
288                 init();
289             }
290
291             /// Initializes the object
292             kernel(
293                 unsigned int nCompactFactor  ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
294                 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
295                 )
296                 : m_nCount(0)
297                 , m_pHead( nullptr )
298                 , m_pThreadRec( tls_cleanup )
299                 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 ))   // binary mask
300                 , m_nCombinePassCount( nCombinePassCount )
301             {
302                 init();
303             }
304
305             /// Destroys the objects and mark all publication records as inactive
306             ~kernel()
307             {
308                 // mark all publication record as detached
309                 for ( publication_record * p = m_pHead; p; p = p->pNext.load( memory_model::memory_order_relaxed ))
310                     p->pOwner = nullptr;
311             }
312
313             /// Gets publication list record for the current thread
314             /**
315                 If there is no publication record for the current thread
316                 the function allocates it.
317             */
318             publication_record_type * acquire_record()
319             {
320                 publication_record_type * pRec = m_pThreadRec.get();
321                 if ( !pRec ) {
322                     // Allocate new publication record
323                     pRec = cxx11_allocator().New();
324                     pRec->pOwner = reinterpret_cast<void *>( this );
325                     m_pThreadRec.reset( pRec );
326                     m_Stat.onCreatePubRecord();
327                 }
328
329                 if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
330                     publish( pRec );
331
332                 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_EmptyRecord );
333
334                 m_Stat.onAcquirePubRecord();
335                 return pRec;
336             }
337
338             /// Marks publication record for the current thread as empty
339             void release_record( publication_record_type * pRec )
340             {
341                 assert( pRec->is_done() );
342                 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_relaxed );
343                 m_Stat.onReleasePubRecord();
344             }
345
346             /// Trying to execute operation \p nOpId
347             /**
348                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
349                 \p owner is a container that is owner of flat combining kernel object.
350                 As a result the current thread can become a combiner or can wait for
351                 another combiner performs \p pRec operation.
352
353                 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
354                 for each active non-empty publication record.
355             */
356             template <class Container>
357             void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
358             {
359                 assert( nOpId >= req_Operation );
360                 assert( pRec );
361                 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
362                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
363
364                 m_Stat.onOperation();
365
366                 try_combining( owner, pRec );
367             }
368
369             /// Trying to execute operation \p nOpId in batch-combine mode
370             /**
371                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
372                 \p owner is a container that owns flat combining kernel object.
373                 As a result the current thread can become a combiner or can wait for
374                 another combiner performs \p pRec operation.
375
376                 If the thread becomes a combiner, the kernel calls \p owner.fc_process
377                 giving the container the full access over publication list. This function
378                 is useful for an elimination technique if the container supports any kind of
379                 that. The container can perform multiple pass through publication list.
380
381                 \p owner.fc_process has two arguments - forward iterators on begin and end of
382                 publication list, see \ref iterator class. For each processed record the container
383                 should call \ref operation_done function to mark the record as processed.
384
385                 On the end of \p %batch_combine the \ref combine function is called
386                 to process rest of publication records.
387             */
388             template <class Container>
389             void batch_combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
390             {
391                 assert( nOpId >= req_Operation );
392                 assert( pRec );
393                 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
394                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
395
396                 m_Stat.onOperation();
397
398                 try_batch_combining( owner, pRec );
399             }
400
401             /// Waits for end of combining
402             void wait_while_combining() const
403             {
404                 lock_guard l( m_Mutex );
405             }
406
407             /// Marks \p rec as executed
408             /**
409                 This function should be called by container if batch_combine mode is used.
410                 For usual combining (see \ref combine) this function is excess.
411             */
412             void operation_done( publication_record& rec )
413             {
414                 rec.nRequest.store( req_Response, memory_model::memory_order_release );
415             }
416
417             /// Internal statistics
418             stat const& statistics() const
419             {
420                 return m_Stat;
421             }
422
423             //@cond
424             // For container classes based on flat combining
425             stat& internal_statistics() const
426             {
427                 return m_Stat;
428             }
429             //@endcond
430
431             /// Returns the compact factor
432             unsigned int compact_factor() const
433             {
434                 return m_nCompactFactor + 1;
435             }
436
437             /// Returns number of combining passes for combiner thread
438             unsigned int combine_pass_count() const
439             {
440                 return m_nCombinePassCount;
441             }
442
443         public:
444             /// Publication list iterator
445             /**
446                 Iterators are intended for batch processing by container's
447                 \p fc_process function.
448                 The iterator allows iterate through active publication list.
449             */
450             class iterator
451             {
452                 //@cond
453                 friend class kernel;
454                 publication_record_type * m_pRec;
455                 //@endcond
456
457             protected:
458                 //@cond
459                 iterator( publication_record_type * pRec )
460                     : m_pRec( pRec )
461                 {
462                     skip_inactive();
463                 }
464
465                 void skip_inactive()
466                 {
467                     while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
468                                     || m_pRec->nRequest.load( memory_model::memory_order_relaxed) < req_Operation ))
469                     {
470                         m_pRec = static_cast<publication_record_type *>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
471                     }
472                 }
473                 //@endcond
474
475             public:
476                 /// Initializes an empty iterator object
477                 iterator()
478                     : m_pRec( nullptr )
479                 {}
480
481                 /// Copy ctor
482                 iterator( iterator const& src )
483                     : m_pRec( src.m_pRec )
484                 {}
485
486                 /// Pre-increment
487                 iterator& operator++()
488                 {
489                     assert( m_pRec );
490                     m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
491                     skip_inactive();
492                     return *this;
493                 }
494
495                 /// Post-increment
496                 iterator operator++(int)
497                 {
498                     assert( m_pRec );
499                     iterator it(*this);
500                     ++(*this);
501                     return it;
502                 }
503
504                 /// Dereference operator, can return \p nullptr
505                 publication_record_type * operator ->()
506                 {
507                     return m_pRec;
508                 }
509
510                 /// Dereference operator, the iterator should not be an end iterator
511                 publication_record_type& operator*()
512                 {
513                     assert( m_pRec );
514                     return *m_pRec;
515                 }
516
517                 /// Iterator equality
518                 friend bool operator==( iterator it1, iterator it2 )
519                 {
520                     return it1.m_pRec == it2.m_pRec;
521                 }
522
523                 /// Iterator inequality
524                 friend bool operator!=( iterator it1, iterator it2 )
525                 {
526                     return !( it1 == it2 );
527                 }
528             };
529
530             /// Returns an iterator to the first active publication record
531             iterator begin()    { return iterator(m_pHead); }
532
533             /// Returns an iterator to the end of publication list. Should not be dereferenced.
534             iterator end()      { return iterator(); }
535
536         private:
537             //@cond
538             static void tls_cleanup( publication_record_type * pRec )
539             {
540                 // Thread done
541                 // pRec that is TLS data should be excluded from publication list
542                 if ( pRec ) {
543                     if ( pRec->nState.load(memory_model::memory_order_relaxed) == active && pRec->pOwner ) {
544                         // record is active and kernel is alive
545                         unsigned int nState = active;
546                         pRec->nState.compare_exchange_strong( nState, removed, memory_model::memory_order_release, atomics::memory_order_relaxed );
547                     }
548                     else {
549                         // record is not in publication list or kernel already deleted
550                         cxx11_allocator().Delete( pRec );
551                     }
552                 }
553             }
554
555             void init()
556             {
557                 assert( m_pThreadRec.get() == nullptr );
558                 publication_record_type * pRec = cxx11_allocator().New();
559                 m_pHead = pRec;
560                 pRec->pOwner = this;
561                 m_pThreadRec.reset( pRec );
562                 m_Stat.onCreatePubRecord();
563             }
564
565             void publish( publication_record_type * pRec )
566             {
567                 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
568
569                 pRec->nAge = m_nCount;
570                 pRec->nState.store( active, memory_model::memory_order_release );
571
572                 // Insert record to publication list
573                 if ( m_pHead != static_cast<publication_record *>(pRec) ) {
574                     publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
575                     if ( p != static_cast<publication_record *>( pRec )) {
576                         do {
577                             pRec->pNext = p;
578                             // Failed CAS changes p
579                         } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
580                             memory_model::memory_order_release, atomics::memory_order_relaxed ));
581                         m_Stat.onActivatPubRecord();
582                     }
583                 }
584             }
585
586             void republish( publication_record_type * pRec )
587             {
588                 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
589                     // The record has been excluded from publication list. Reinsert it
590                     publish( pRec );
591                 }
592             }
593
594             template <class Container>
595             void try_combining( Container& owner, publication_record_type * pRec )
596             {
597                 if ( m_Mutex.try_lock() ) {
598                     // The thread becomes a combiner
599                     lock_guard l( m_Mutex, std::adopt_lock_t() );
600
601                     // The record pRec can be excluded from publication list. Re-publish it
602                     republish( pRec );
603
604                     combining( owner );
605                     assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
606                 }
607                 else {
608                     // There is another combiner, wait while it executes our request
609                     if ( !wait_for_combining( pRec ) ) {
610                         // The thread becomes a combiner
611                         lock_guard l( m_Mutex, std::adopt_lock_t() );
612
613                         // The record pRec can be excluded from publication list. Re-publish it
614                         republish( pRec );
615
616                         combining( owner );
617                         assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
618                     }
619                 }
620             }
621
622             template <class Container>
623             void try_batch_combining( Container& owner, publication_record_type * pRec )
624             {
625                 if ( m_Mutex.try_lock() ) {
626                     // The thread becomes a combiner
627                     lock_guard l( m_Mutex, std::adopt_lock_t() );
628
629                     // The record pRec can be excluded from publication list. Re-publish it
630                     republish( pRec );
631
632                     batch_combining( owner );
633                     assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
634                 }
635                 else {
636                     // There is another combiner, wait while it executes our request
637                     if ( !wait_for_combining( pRec ) ) {
638                         // The thread becomes a combiner
639                         lock_guard l( m_Mutex, std::adopt_lock_t() );
640
641                         // The record pRec can be excluded from publication list. Re-publish it
642                         republish( pRec );
643
644                         batch_combining( owner );
645                         assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
646                     }
647                 }
648             }
649
650             template <class Container>
651             void combining( Container& owner )
652             {
653                 // The thread is a combiner
654                 assert( !m_Mutex.try_lock() );
655
656                 unsigned int const nCurAge = ++m_nCount;
657
658                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
659                     if ( !combining_pass( owner, nCurAge ))
660                         break;
661
662                 m_Stat.onCombining();
663                 if ( (nCurAge & m_nCompactFactor) == 0 )
664                     compact_list( nCurAge );
665             }
666
667             template <class Container>
668             bool combining_pass( Container& owner, unsigned int nCurAge )
669             {
670                 publication_record * pPrev = nullptr;
671                 publication_record * p = m_pHead;
672                 bool bOpDone = false;
673                 while ( p ) {
674                     switch ( p->nState.load( memory_model::memory_order_acquire )) {
675                         case active:
676                             if ( p->op() >= req_Operation ) {
677                                 p->nAge = nCurAge;
678                                 owner.fc_apply( static_cast<publication_record_type *>(p) );
679                                 operation_done( *p );
680                                 bOpDone = true;
681                             }
682                             break;
683                         case inactive:
684                             // Only m_pHead can be inactive in the publication list
685                             assert( p == m_pHead );
686                             break;
687                         case removed:
688                             // The record should be removed
689                             p = unlink_and_delete_record( pPrev, p );
690                             continue;
691                         default:
692                             /// ??? That is impossible
693                             assert(false);
694                     }
695                     pPrev = p;
696                     p = p->pNext.load( memory_model::memory_order_acquire );
697                 }
698                 return bOpDone;
699             }
700
701             template <class Container>
702             void batch_combining( Container& owner )
703             {
704                 // The thread is a combiner
705                 assert( !m_Mutex.try_lock() );
706
707                 unsigned int const nCurAge = ++m_nCount;
708
709                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
710                     owner.fc_process( begin(), end() );
711
712                 combining_pass( owner, nCurAge );
713                 m_Stat.onCombining();
714                 if ( (nCurAge & m_nCompactFactor) == 0 )
715                     compact_list( nCurAge );
716             }
717
718             bool wait_for_combining( publication_record_type * pRec )
719             {
720                 back_off bkoff;
721                 while ( pRec->nRequest.load( memory_model::memory_order_acquire ) != req_Response ) {
722
723                     // The record can be excluded from publication list. Reinsert it
724                     republish( pRec );
725
726                     bkoff();
727
728                     if ( m_Mutex.try_lock() ) {
729                         if ( pRec->nRequest.load( memory_model::memory_order_acquire ) == req_Response ) {
730                             m_Mutex.unlock();
731                             break;
732                         }
733                         // The thread becomes a combiner
734                         return false;
735                     }
736                 }
737                 return true;
738             }
739
740             void compact_list( unsigned int const nCurAge )
741             {
742                 // Thinning publication list
743                 publication_record * pPrev = nullptr;
744                 for ( publication_record * p = m_pHead; p; ) {
745                     if ( p->nState.load( memory_model::memory_order_acquire ) == active && p->nAge + m_nCompactFactor < nCurAge ) {
746                         if ( pPrev ) {
747                             publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
748                             if ( pPrev->pNext.compare_exchange_strong( p, pNext,
749                                 memory_model::memory_order_release, atomics::memory_order_relaxed ))
750                             {
751                                 p->nState.store( inactive, memory_model::memory_order_release );
752                                 p = pNext;
753                                 m_Stat.onDeactivatePubRecord();
754                                 continue;
755                             }
756                         }
757                     }
758                     pPrev = p;
759                     p = p->pNext.load( memory_model::memory_order_acquire );
760                 }
761
762                 m_Stat.onCompactPublicationList();
763             }
764
765             publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
766             {
767                 if ( pPrev ) {
768                     publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
769                     if ( pPrev->pNext.compare_exchange_strong( p, pNext,
770                         memory_model::memory_order_release, atomics::memory_order_relaxed ))
771                     {
772                         cxx11_allocator().Delete( static_cast<publication_record_type *>( p ));
773                         m_Stat.onDeletePubRecord();
774                     }
775                     return pNext;
776                 }
777                 else {
778                     m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
779                     cxx11_allocator().Delete( static_cast<publication_record_type *>( p ));
780                     m_Stat.onDeletePubRecord();
781                     return m_pHead;
782                 }
783             }
784             //@endcond
785         };
786
787         //@cond
788         class container
789         {
790         public:
791             template <typename PubRecord>
792             void fc_apply( PubRecord * )
793             {
794                 assert( false );
795             }
796
797             template <typename Iterator>
798             void fc_process( Iterator, Iterator )
799             {
800                 assert( false );
801             }
802         };
803         //@endcond
804
805     } // namespace flat_combining
806 }} // namespace cds::algo
807
808 #endif // #ifndef __CDS_ALGO_FLAT_COMBINING_H