Fixed use-after-free bug
[libcds.git] / cds / algo / flat_combining / kernel.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H
32 #define CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H
33
34 #include <cds/algo/flat_combining/defs.h>
35 #include <cds/algo/flat_combining/wait_strategy.h>
36
37 #include <cds/sync/spinlock.h>
38 #include <cds/details/allocator.h>
39 #include <cds/opt/options.h>
40 #include <cds/algo/int_algo.h>
41
42 namespace cds { namespace algo {
43
44     /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
45     /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
46
47     /// Flat combining
48     /**
49         @anchor cds_flat_combining_description
50         Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
51         [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
52         The technique converts a sequential data structure to its concurrent implementation.
53         A few structures are added to the sequential implementation: a <i>global lock</i>,
54         a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
55         of a <i>publication list</i>. The publication list is a list of thread-local records
56         of a size proportional to the number of threads that are concurrently accessing the shared object.
57
58         Each thread \p t accessing the structure to perform an invocation of some method \p f()
59         on the shared object executes the following sequence of steps:
60         <ol>
61         <li>Write the invocation opcode and parameters (if any) of the method \p f() to be applied
62         sequentially to the shared object in the <i>request</i> field of your thread local publication
63         record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
64         be used to receive the response. If your thread local publication record is marked as active
65         continue to step 2, otherwise continue to step 5.</li>
66         <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
67         field waiting for a response to the invocation (one can add a yield at this point to allow other threads
68         on the same core to run). Once in a while while spinning check if the lock is still taken and that your
69         record is active (you may use any of \p wait_strategy instead of spinning). If your record is inactive proceed to step 5.
70         Once the response is available, reset the request field to null and return the response.</li>
71         <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
72         return to spinning in step 2.</li>
73         <li>Otherwise, you hold the lock and are a combiner.
74         <ul>
75             <li>Increment the combining pass count by one.</li>
76             <li>Execute a \p fc_apply() by traversing the publication list from the head,
77             combining all non-null method call invocations, setting the <i>age</i> of each of these records
78             to the current <i>count</i>, applying the combined method calls to the structure D, and returning
79             responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
80             <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
81             list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
82             by the head in the list), remove from the publication list all records whose <i>age</i> is
83             much smaller than the current <i>count</i>. This is done by removing the node and marking it
84             as inactive.</li>
85             <li>Release the lock.</li>
86         </ul>
87         <li>If you have no thread local publication record allocate one, marked as active. If you already
88         have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
89         the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
90         </ol>
91
92         As the test results show, the flat combining technique is suitable for non-intrusive containers
93         like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
94         less impressive results.
95
96         \ref cds_flat_combining_container "List of FC-based containers" in libcds.
97
98         \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
99     */
100     namespace flat_combining {
101
102         /// Flat combining internal statistics
103         template <typename Counter = cds::atomicity::event_counter >
104         struct stat
105         {
106             typedef Counter counter_type;   ///< Event counter type
107
108             counter_type    m_nOperationCount   ;   ///< How many operations have been performed
109             counter_type    m_nCombiningCount   ;   ///< Combining call count
110             counter_type    m_nCompactPublicationList; ///< Count of publication list compacting
111             counter_type    m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
112             counter_type    m_nActivatePubRecord;   ///< Count of publication record activating
113             counter_type    m_nPubRecordCreated ;   ///< Count of created publication records
114             counter_type    m_nPubRecordDeleted ;   ///< Count of deleted publication records
115             counter_type    m_nPassiveWaitCall;     ///< Count of passive waiting call (\p kernel::wait_for_combining())
116             counter_type    m_nPassiveWaitIteration;///< Count of iteration inside passive waiting
117             counter_type    m_nPassiveWaitWakeup;   ///< Count of forcing wake-up of passive wait cycle
118             counter_type    m_nInvokeExclusive;     ///< Count of call \p kernel::invoke_exclusive()
119             counter_type    m_nWakeupByNotifying;   ///< How many times the passive thread be waked up by a notification
120             counter_type    m_nPassiveToCombiner;   ///< How many times the passive thread becomes the combiner
121
122             /// Returns current combining factor
123             /**
124                 Combining factor is how many operations perform in one combine pass:
125                 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
126             */
127             double combining_factor() const
128             {
129                 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
130             }
131
132             //@cond
133             void    onOperation()               { ++m_nOperationCount;          }
134             void    onCombining()               { ++m_nCombiningCount;          }
135             void    onCompactPublicationList()  { ++m_nCompactPublicationList;  }
136             void    onDeactivatePubRecord()     { ++m_nDeactivatePubRecord;     }
137             void    onActivatePubRecord()       { ++m_nActivatePubRecord;       }
138             void    onCreatePubRecord()         { ++m_nPubRecordCreated;        }
139             void    onDeletePubRecord()         { ++m_nPubRecordDeleted;        }
140             void    onPassiveWait()             { ++m_nPassiveWaitCall;         }
141             void    onPassiveWaitIteration()    { ++m_nPassiveWaitIteration;    }
142             void    onPassiveWaitWakeup()       { ++m_nPassiveWaitWakeup;       }
143             void    onInvokeExclusive()         { ++m_nInvokeExclusive;         }
144             void    onWakeupByNotifying()       { ++m_nWakeupByNotifying;       }
145             void    onPassiveToCombiner()       { ++m_nPassiveToCombiner;       }
146
147             //@endcond
148         };
149
150         /// Flat combining dummy internal statistics
151         struct empty_stat
152         {
153             //@cond
154             void    onOperation()               const {}
155             void    onCombining()               const {}
156             void    onCompactPublicationList()  const {}
157             void    onDeactivatePubRecord()     const {}
158             void    onActivatePubRecord()       const {}
159             void    onCreatePubRecord()         const {}
160             void    onDeletePubRecord()         const {}
161             void    onPassiveWait()             const {}
162             void    onPassiveWaitIteration()    const {}
163             void    onPassiveWaitWakeup()       const {}
164             void    onInvokeExclusive()         const {}
165             void    onWakeupByNotifying()       const {}
166             void    onPassiveToCombiner()       const {}
167             //@endcond
168         };
169
170         /// Type traits of \ref kernel class
171         /**
172             You can define different type traits for \ref kernel
173             by specifying your struct based on \p %traits
174             or by using \ref make_traits metafunction.
175         */
176         struct traits
177         {
178             typedef cds::sync::spin             lock_type;  ///< Lock type
179             typedef cds::algo::flat_combining::wait_strategy::backoff< cds::backoff::delay_of<2>> wait_strategy; ///< Wait strategy
180             typedef CDS_DEFAULT_ALLOCATOR       allocator;  ///< Allocator used for TLS data (allocating \p publication_record derivatives)
181             typedef empty_stat                  stat;       ///< Internal statistics
182             typedef opt::v::relaxed_ordering  memory_model; ///< /// C++ memory ordering model
183         };
184
185         /// Metafunction converting option list to traits
186         /**
187             \p Options are:
188             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
189             - \p opt::wait_strategy - wait strategy, see \p wait_strategy namespace, default is \p wait_strategy::backoff.
190             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
191             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
192             - \p opt::memory_model - C++ memory ordering model.
193                 List of all available memory ordering see \p opt::memory_model.
194                 Default is \p cds::opt::v::relaxed_ordering
195         */
196         template <typename... Options>
197         struct make_traits {
198 #   ifdef CDS_DOXYGEN_INVOKED
199             typedef implementation_defined type ;   ///< Metafunction result
200 #   else
201             typedef typename cds::opt::make_options<
202                 typename cds::opt::find_type_traits< traits, Options... >::type
203                 ,Options...
204             >::type   type;
205 #   endif
206         };
207
208         /// The kernel of flat combining
209         /**
210             Template parameters:
211             - \p PublicationRecord - a type derived from \ref publication_record
212             - \p Traits - a type traits of flat combining, default is \p flat_combining::traits.
213                 \ref make_traits metafunction can be used to create type traits
214
215             The kernel object should be a member of a container class. The container cooperates with flat combining
216             kernel object. There are two ways to interact with the kernel:
217             - One-by-one processing the active records of the publication list. This mode provides by \p combine() function:
218               the container acquires its publication record by \p acquire_record(), fills its fields and calls
219               \p combine() function of its kernel object. If the current thread becomes a combiner, the kernel
220               calls \p fc_apply() function of the container for each active non-empty record. Then, the container
221               should release its publication record by \p release_record(). Only one pass through the publication
222               list is possible.
223             - Batch processing - \p batch_combine() function. It this mode the container obtains access
224               to entire publication list. This mode allows the container to perform an elimination, for example,
225               the stack can collide \p push() and \p pop() requests. The sequence of invocations is the following:
226               the container acquires its publication record by \p acquire_record(), fills its field and call
227               \p batch_combine() function of its kernel object. If the current thread becomes a combiner,
228               the kernel calls \p fc_process() function of the container passing two iterators pointing to
229               the begin and the end of publication list (see \ref iterator class). The iterators allow
230               multiple pass through active records of publication list. For each processed record the container
231               should call \p operation_done() function. On the end, the container should release
232               its record by \p release_record().
233         */
234         template <
235             typename PublicationRecord
236             ,typename Traits = traits
237         >
238         class kernel
239         {
240         public:
241             typedef Traits   traits;                               ///< Type traits
242             typedef typename traits::lock_type global_lock_type;   ///< Global lock type
243             typedef typename traits::wait_strategy wait_strategy;  ///< Wait strategy type
244             typedef typename traits::allocator allocator;          ///< Allocator type (used for allocating publication_record_type data)
245             typedef typename traits::stat      stat;               ///< Internal statistics
246             typedef typename traits::memory_model memory_model;    ///< C++ memory model
247
248             typedef typename wait_strategy::template make_publication_record<PublicationRecord>::type publication_record_type; ///< Publication record type
249
250         protected:
251             //@cond
252             typedef cds::details::Allocator< publication_record_type, allocator >   cxx11_allocator; ///< internal helper cds::details::Allocator
253             typedef std::lock_guard<global_lock_type> lock_guard;
254             //@endcond
255
256         protected:
257             atomics::atomic<unsigned int>  m_nCount;   ///< Total count of combining passes. Used as an age.
258             publication_record_type *   m_pHead;    ///< Head of publication list
259             boost::thread_specific_ptr< publication_record_type > m_pThreadRec;   ///< Thread-local publication record
260             mutable global_lock_type    m_Mutex;    ///< Global mutex
261             mutable stat                m_Stat;     ///< Internal statistics
262             unsigned int const          m_nCompactFactor;    ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
263             unsigned int const          m_nCombinePassCount; ///< Number of combining passes
264             wait_strategy               m_waitStrategy;      ///< Wait strategy
265
266         public:
267             /// Initializes the object
268             /**
269                 Compact factor = 1024
270
271                 Combiner pass count = 8
272             */
273             kernel()
274                 : kernel( 1024, 8 )
275             {}
276
277             /// Initializes the object
278             kernel(
279                 unsigned int nCompactFactor  ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
280                 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
281                 )
282                 : m_nCount(0)
283                 , m_pHead( nullptr )
284                 , m_pThreadRec( tls_cleanup )
285                 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 ))   // binary mask
286                 , m_nCombinePassCount( nCombinePassCount )
287             {
288                 init();
289             }
290
291             /// Destroys the objects and mark all publication records as inactive
292             ~kernel()
293             {
294                 m_pThreadRec.reset();   // calls tls_cleanup()
295
296                 // delete all publication records
297                 for ( publication_record* p = m_pHead; p; ) {
298                     publication_record * pRec = p;
299                     p = p->pNext.load( memory_model::memory_order_relaxed );
300                     free_publication_record( static_cast<publication_record_type *>( pRec ));
301                 }
302             }
303
304             /// Gets publication list record for the current thread
305             /**
306                 If there is no publication record for the current thread
307                 the function allocates it.
308             */
309             publication_record_type * acquire_record()
310             {
311                 publication_record_type * pRec = m_pThreadRec.get();
312                 if ( !pRec ) {
313                     // Allocate new publication record
314                     pRec = cxx11_allocator().New();
315                     m_pThreadRec.reset( pRec );
316                     m_Stat.onCreatePubRecord();
317                 }
318
319                 if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
320                     publish( pRec );
321
322                 assert( pRec->op() == req_EmptyRecord );
323
324                 return pRec;
325             }
326
327             /// Marks publication record for the current thread as empty
328             void release_record( publication_record_type * pRec )
329             {
330                 assert( pRec->is_done());
331                 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
332             }
333
334             /// Trying to execute operation \p nOpId
335             /**
336                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
337                 \p owner is a container that is owner of flat combining kernel object.
338                 As a result the current thread can become a combiner or can wait for
339                 another combiner performs \p pRec operation.
340
341                 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
342                 for each active non-empty publication record.
343             */
344             template <class Container>
345             void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
346             {
347                 assert( nOpId >= req_Operation );
348                 assert( pRec );
349
350                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
351                 m_Stat.onOperation();
352
353                 try_combining( owner, pRec );
354             }
355
356             /// Trying to execute operation \p nOpId in batch-combine mode
357             /**
358                 \p pRec is the publication record acquiring by \p acquire_record() earlier.
359                 \p owner is a container that owns flat combining kernel object.
360                 As a result the current thread can become a combiner or can wait for
361                 another combiner performs \p pRec operation.
362
363                 If the thread becomes a combiner, the kernel calls \p owner.fc_process()
364                 giving the container the full access over publication list. This function
365                 is useful for an elimination technique if the container supports any kind of
366                 that. The container can perform multiple pass through publication list.
367
368                 \p owner.fc_process() has two arguments - forward iterators on begin and end of
369                 publication list, see \ref iterator class. For each processed record the container
370                 should call \p operation_done() function to mark the record as processed.
371
372                 On the end of \p %batch_combine the \p combine() function is called
373                 to process rest of publication records.
374             */
375             template <class Container>
376             void batch_combine( unsigned int nOpId, publication_record_type* pRec, Container& owner )
377             {
378                 assert( nOpId >= req_Operation );
379                 assert( pRec );
380
381                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
382                 m_Stat.onOperation();
383
384                 try_batch_combining( owner, pRec );
385             }
386
387             /// Invokes \p Func in exclusive mode
388             /**
389                 Some operation in flat combining containers should be called in exclusive mode
390                 i.e the current thread should become the combiner to process the operation.
391                 The typical example is \p empty() function.
392
393                 \p %invoke_exclusive() allows do that: the current thread becomes the combiner,
394                 invokes \p f exclusively but unlike a typical usage the thread does not process any pending request.
395                 Instead, after end of \p f call the current thread wakes up a pending thread if any.
396             */
397             template <typename Func>
398             void invoke_exclusive( Func f )
399             {
400                 {
401                     lock_guard l( m_Mutex );
402                     f();
403                 }
404                 m_waitStrategy.wakeup( *this );
405                 m_Stat.onInvokeExclusive();
406             }
407
408             /// Marks \p rec as executed
409             /**
410                 This function should be called by container if \p batch_combine mode is used.
411                 For usual combining (see \p combine()) this function is excess.
412             */
413             void operation_done( publication_record& rec )
414             {
415                 rec.nRequest.store( req_Response, memory_model::memory_order_release );
416                 m_waitStrategy.notify( *this, static_cast<publication_record_type&>( rec ));
417             }
418
419             /// Internal statistics
420             stat const& statistics() const
421             {
422                 return m_Stat;
423             }
424
425             //@cond
426             // For container classes based on flat combining
427             stat& internal_statistics() const
428             {
429                 return m_Stat;
430             }
431             //@endcond
432
433             /// Returns the compact factor
434             unsigned int compact_factor() const
435             {
436                 return m_nCompactFactor + 1;
437             }
438
439             /// Returns number of combining passes for combiner thread
440             unsigned int combine_pass_count() const
441             {
442                 return m_nCombinePassCount;
443             }
444
445         public:
446             /// Publication list iterator
447             /**
448                 Iterators are intended for batch processing by container's
449                 \p fc_process function.
450                 The iterator allows iterate through active publication list.
451             */
452             class iterator
453             {
454                 //@cond
455                 friend class kernel;
456                 publication_record_type * m_pRec;
457                 //@endcond
458
459             protected:
460                 //@cond
461                 iterator( publication_record_type * pRec )
462                     : m_pRec( pRec )
463                 {
464                     skip_inactive();
465                 }
466
467                 void skip_inactive()
468                 {
469                     while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
470                                     || m_pRec->op( memory_model::memory_order_relaxed) < req_Operation ))
471                     {
472                         m_pRec = static_cast<publication_record_type*>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
473                     }
474                 }
475                 //@endcond
476
477             public:
478                 /// Initializes an empty iterator object
479                 iterator()
480                     : m_pRec( nullptr )
481                 {}
482
483                 /// Copy ctor
484                 iterator( iterator const& src )
485                     : m_pRec( src.m_pRec )
486                 {}
487
488                 /// Pre-increment
489                 iterator& operator++()
490                 {
491                     assert( m_pRec );
492                     m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
493                     skip_inactive();
494                     return *this;
495                 }
496
497                 /// Post-increment
498                 iterator operator++(int)
499                 {
500                     assert( m_pRec );
501                     iterator it(*this);
502                     ++(*this);
503                     return it;
504                 }
505
506                 /// Dereference operator, can return \p nullptr
507                 publication_record_type* operator ->()
508                 {
509                     return m_pRec;
510                 }
511
512                 /// Dereference operator, the iterator should not be an end iterator
513                 publication_record_type& operator*()
514                 {
515                     assert( m_pRec );
516                     return *m_pRec;
517                 }
518
519                 /// Iterator equality
520                 friend bool operator==( iterator it1, iterator it2 )
521                 {
522                     return it1.m_pRec == it2.m_pRec;
523                 }
524
525                 /// Iterator inequality
526                 friend bool operator!=( iterator it1, iterator it2 )
527                 {
528                     return !( it1 == it2 );
529                 }
530             };
531
532             /// Returns an iterator to the first active publication record
533             iterator begin()    { return iterator(m_pHead); }
534
535             /// Returns an iterator to the end of publication list. Should not be dereferenced.
536             iterator end()      { return iterator(); }
537
538         public:
539             /// Gets current value of \p rec.nRequest
540             /**
541                 This function is intended for invoking from a wait strategy
542             */
543             int get_operation( publication_record& rec )
544             {
545                 return rec.op( memory_model::memory_order_acquire );
546             }
547
548             /// Wakes up any waiting thread
549             /**
550                 This function is intended for invoking from a wait strategy
551             */
552             void wakeup_any()
553             {
554                 publication_record* pRec = m_pHead;
555                 while ( pRec ) {
556                     if ( pRec->nState.load( memory_model::memory_order_acquire ) == active
557                       && pRec->op( memory_model::memory_order_acquire ) >= req_Operation )
558                     {
559                         m_waitStrategy.notify( *this, static_cast<publication_record_type&>( *pRec ));
560                         break;
561                     }
562                     pRec = pRec->pNext.load( memory_model::memory_order_acquire );
563                 }
564             }
565
566         private:
567             //@cond
568             static void tls_cleanup( publication_record_type* pRec )
569             {
570                 // Thread done
571                 // pRec that is TLS data should be excluded from publication list
572                 pRec->nState.store( removed, memory_model::memory_order_release );
573             }
574
575             static void free_publication_record( publication_record_type* pRec )
576             {
577                 cxx11_allocator().Delete( pRec );
578             }
579
580             void init()
581             {
582                 assert( m_pThreadRec.get() == nullptr );
583                 publication_record_type* pRec = cxx11_allocator().New();
584                 m_pHead = pRec;
585                 m_pThreadRec.reset( pRec );
586                 m_Stat.onCreatePubRecord();
587             }
588
589             void publish( publication_record_type* pRec )
590             {
591                 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
592
593                 pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_release );
594                 pRec->nState.store( active, memory_model::memory_order_release );
595
596                 // Insert record to publication list
597                 if ( m_pHead != static_cast<publication_record *>(pRec)) {
598                     publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
599                     if ( p != static_cast<publication_record *>( pRec )) {
600                         do {
601                             pRec->pNext = p;
602                             // Failed CAS changes p
603                         } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
604                             memory_model::memory_order_release, atomics::memory_order_relaxed ));
605                         m_Stat.onActivatePubRecord();
606                     }
607                 }
608             }
609
610             void republish( publication_record_type* pRec )
611             {
612                 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
613                     // The record has been excluded from publication list. Reinsert it
614                     publish( pRec );
615                 }
616             }
617
618             template <class Container>
619             void try_combining( Container& owner, publication_record_type* pRec )
620             {
621                 if ( m_Mutex.try_lock()) {
622                     // The thread becomes a combiner
623                     lock_guard l( m_Mutex, std::adopt_lock_t());
624
625                     // The record pRec can be excluded from publication list. Re-publish it
626                     republish( pRec );
627
628                     combining( owner );
629                     assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
630                 }
631                 else {
632                     // There is another combiner, wait while it executes our request
633                     if ( !wait_for_combining( pRec )) {
634                         // The thread becomes a combiner
635                         lock_guard l( m_Mutex, std::adopt_lock_t());
636
637                         // The record pRec can be excluded from publication list. Re-publish it
638                         republish( pRec );
639
640                         combining( owner );
641                         assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
642                     }
643                 }
644             }
645
646             template <class Container>
647             void try_batch_combining( Container& owner, publication_record_type * pRec )
648             {
649                 if ( m_Mutex.try_lock()) {
650                     // The thread becomes a combiner
651                     lock_guard l( m_Mutex, std::adopt_lock_t());
652
653                     // The record pRec can be excluded from publication list. Re-publish it
654                     republish( pRec );
655
656                     batch_combining( owner );
657                     assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
658                 }
659                 else {
660                     // There is another combiner, wait while it executes our request
661                     if ( !wait_for_combining( pRec )) {
662                         // The thread becomes a combiner
663                         lock_guard l( m_Mutex, std::adopt_lock_t());
664
665                         // The record pRec can be excluded from publication list. Re-publish it
666                         republish( pRec );
667
668                         batch_combining( owner );
669                         assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
670                     }
671                 }
672             }
673
674             template <class Container>
675             void combining( Container& owner )
676             {
677                 // The thread is a combiner
678                 assert( !m_Mutex.try_lock());
679
680                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
681
682                 unsigned int nEmptyPassCount = 0;
683                 unsigned int nUsefulPassCount = 0;
684                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass ) {
685                     if ( combining_pass( owner, nCurAge ))
686                         ++nUsefulPassCount;
687                     else if ( ++nEmptyPassCount > nUsefulPassCount )
688                         break;
689                 }
690
691                 m_Stat.onCombining();
692                 if ( (nCurAge & m_nCompactFactor) == 0 )
693                     compact_list( nCurAge );
694             }
695
696             template <class Container>
697             bool combining_pass( Container& owner, unsigned int nCurAge )
698             {
699                 publication_record* pPrev = nullptr;
700                 publication_record* p = m_pHead;
701                 bool bOpDone = false;
702                 while ( p ) {
703                     switch ( p->nState.load( memory_model::memory_order_acquire )) {
704                         case active:
705                             if ( p->op() >= req_Operation ) {
706                                 p->nAge.store( nCurAge, memory_model::memory_order_release );
707                                 owner.fc_apply( static_cast<publication_record_type*>(p));
708                                 operation_done( *p );
709                                 bOpDone = true;
710                             }
711                             break;
712                         case inactive:
713                             // Only m_pHead can be inactive in the publication list
714                             assert( p == m_pHead );
715                             break;
716                         case removed:
717                             // The record should be removed
718                             p = unlink_and_delete_record( pPrev, p );
719                             continue;
720                         default:
721                             /// ??? That is impossible
722                             assert(false);
723                     }
724                     pPrev = p;
725                     p = p->pNext.load( memory_model::memory_order_acquire );
726                 }
727                 return bOpDone;
728             }
729
730             template <class Container>
731             void batch_combining( Container& owner )
732             {
733                 // The thread is a combiner
734                 assert( !m_Mutex.try_lock());
735
736                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
737
738                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
739                     owner.fc_process( begin(), end());
740
741                 combining_pass( owner, nCurAge );
742                 m_Stat.onCombining();
743                 if ( (nCurAge & m_nCompactFactor) == 0 )
744                     compact_list( nCurAge );
745             }
746
747             bool wait_for_combining( publication_record_type * pRec )
748             {
749                 m_waitStrategy.prepare( *pRec );
750                 m_Stat.onPassiveWait();
751
752                 while ( pRec->op( memory_model::memory_order_acquire ) != req_Response ) {
753                     // The record can be excluded from publication list. Reinsert it
754                     republish( pRec );
755
756                     m_Stat.onPassiveWaitIteration();
757
758                     // Wait while operation processing
759                     if ( m_waitStrategy.wait( *this, *pRec ))
760                         m_Stat.onWakeupByNotifying();
761
762                     if ( m_Mutex.try_lock()) {
763                         if ( pRec->op( memory_model::memory_order_acquire ) == req_Response ) {
764                             // Operation is done
765                             m_Mutex.unlock();
766
767                             // Wake up a pending threads
768                             m_waitStrategy.wakeup( *this );
769                             m_Stat.onPassiveWaitWakeup();
770
771                             break;
772                         }
773                         // The thread becomes a combiner
774                         m_Stat.onPassiveToCombiner();
775                         return false;
776                     }
777                 }
778                 return true;
779             }
780
781             void compact_list( unsigned int const nCurAge )
782             {
783                 // Thinning publication list
784                 publication_record * pPrev = nullptr;
785                 for ( publication_record * p = m_pHead; p; ) {
786                     if ( p->nState.load( memory_model::memory_order_acquire ) == active
787                       && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
788                     {
789                         if ( pPrev ) {
790                             publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
791                             if ( pPrev->pNext.compare_exchange_strong( p, pNext,
792                                 memory_model::memory_order_release, atomics::memory_order_relaxed ))
793                             {
794                                 p->nState.store( inactive, memory_model::memory_order_release );
795                                 p = pNext;
796                                 m_Stat.onDeactivatePubRecord();
797                                 continue;
798                             }
799                         }
800                     }
801                     pPrev = p;
802                     p = p->pNext.load( memory_model::memory_order_acquire );
803                 }
804
805                 m_Stat.onCompactPublicationList();
806             }
807
808             publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
809             {
810                 if ( pPrev ) {
811                     publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
812                     if ( pPrev->pNext.compare_exchange_strong( p, pNext,
813                         memory_model::memory_order_release, atomics::memory_order_relaxed ))
814                     {
815                         free_publication_record( static_cast<publication_record_type *>( p ));
816                         m_Stat.onDeletePubRecord();
817                     }
818                     return pNext;
819                 }
820                 else {
821                     m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
822                     free_publication_record( static_cast<publication_record_type *>( p ));
823                     m_Stat.onDeletePubRecord();
824                     return m_pHead;
825                 }
826             }
827             //@endcond
828         };
829
830         //@cond
831         class container
832         {
833         public:
834             template <typename PubRecord>
835             void fc_apply( PubRecord * )
836             {
837                 assert( false );
838             }
839
840             template <typename Iterator>
841             void fc_process( Iterator, Iterator )
842             {
843                 assert( false );
844             }
845         };
846         //@endcond
847
848     } // namespace flat_combining
849 }} // namespace cds::algo
850
851 #endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H