Fixed memory leaks (experimental)
[libcds.git] / cds / algo / flat_combining / kernel.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2016
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H
32 #define CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H
33
34 #include <cds/algo/flat_combining/defs.h>
35 #include <cds/algo/flat_combining/wait_strategy.h>
36
37 #include <cds/sync/spinlock.h>
38 #include <cds/details/allocator.h>
39 #include <cds/opt/options.h>
40 #include <cds/algo/int_algo.h>
41
42 namespace cds { namespace algo {
43
44     /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
45     /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
46
47     /// Flat combining
48     /**
49         @anchor cds_flat_combining_description
50         Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
51         [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
52         The technique converts a sequential data structure to its concurrent implementation.
53         A few structures are added to the sequential implementation: a <i>global lock</i>,
54         a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
55         of a <i>publication list</i>. The publication list is a list of thread-local records
56         of a size proportional to the number of threads that are concurrently accessing the shared object.
57
58         Each thread \p t accessing the structure to perform an invocation of some method \p f()
59         on the shared object executes the following sequence of steps:
60         <ol>
61         <li>Write the invocation opcode and parameters (if any) of the method \p f() to be applied
62         sequentially to the shared object in the <i>request</i> field of your thread local publication
63         record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
64         be used to receive the response. If your thread local publication record is marked as active
65         continue to step 2, otherwise continue to step 5.</li>
66         <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
67         field waiting for a response to the invocation (one can add a yield at this point to allow other threads
68         on the same core to run). Once in a while while spinning check if the lock is still taken and that your
69         record is active (you may use any of \p wait_strategy instead of spinning). If your record is inactive proceed to step 5.
70         Once the response is available, reset the request field to null and return the response.</li>
71         <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
72         return to spinning in step 2.</li>
73         <li>Otherwise, you hold the lock and are a combiner.
74         <ul>
75             <li>Increment the combining pass count by one.</li>
76             <li>Execute a \p fc_apply() by traversing the publication list from the head,
77             combining all non-null method call invocations, setting the <i>age</i> of each of these records
78             to the current <i>count</i>, applying the combined method calls to the structure D, and returning
79             responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
80             <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
81             list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
82             by the head in the list), remove from the publication list all records whose <i>age</i> is
83             much smaller than the current <i>count</i>. This is done by removing the node and marking it
84             as inactive.</li>
85             <li>Release the lock.</li>
86         </ul>
87         <li>If you have no thread local publication record allocate one, marked as active. If you already
88         have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
89         the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
90         </ol>
91
92         As the test results show, the flat combining technique is suitable for non-intrusive containers
93         like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
94         less impressive results.
95
96         \ref cds_flat_combining_container "List of FC-based containers" in libcds.
97
98         \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
99     */
100     namespace flat_combining {
101
102         /// Flat combining internal statistics
103         template <typename Counter = cds::atomicity::event_counter >
104         struct stat
105         {
106             typedef Counter counter_type;   ///< Event counter type
107
108             counter_type    m_nOperationCount   ;   ///< How many operations have been performed
109             counter_type    m_nCombiningCount   ;   ///< Combining call count
110             counter_type    m_nCompactPublicationList; ///< Count of publication list compacting
111             counter_type    m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
112             counter_type    m_nActivatePubRecord;   ///< Count of publication record activating
113             counter_type    m_nPubRecordCreated ;   ///< Count of created publication records
114             counter_type    m_nPubRecordDeleted ;   ///< Count of deleted publication records
115             counter_type    m_nPassiveWaitCall;     ///< Count of passive waiting call (\p kernel::wait_for_combining())
116             counter_type    m_nPassiveWaitIteration;///< Count of iteration inside passive waiting
117             counter_type    m_nPassiveWaitWakeup;   ///< Count of forcing wake-up of passive wait cycle
118             counter_type    m_nInvokeExclusive;     ///< Count of call \p kernel::invoke_exclusive()
119             counter_type    m_nWakeupByNotifying;   ///< How many times the passive thread be waked up by a notification
120             counter_type    m_nPassiveToCombiner;   ///< How many times the passive thread becomes the combiner
121
122             /// Returns current combining factor
123             /**
124                 Combining factor is how many operations perform in one combine pass:
125                 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
126             */
127             double combining_factor() const
128             {
129                 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
130             }
131
132             //@cond
133             void    onOperation()               { ++m_nOperationCount;          }
134             void    onCombining()               { ++m_nCombiningCount;          }
135             void    onCompactPublicationList()  { ++m_nCompactPublicationList;  }
136             void    onDeactivatePubRecord()     { ++m_nDeactivatePubRecord;     }
137             void    onActivatePubRecord()       { ++m_nActivatePubRecord;       }
138             void    onCreatePubRecord()         { ++m_nPubRecordCreated;        }
139             void    onDeletePubRecord()         { ++m_nPubRecordDeleted;        }
140             void    onPassiveWait()             { ++m_nPassiveWaitCall;         }
141             void    onPassiveWaitIteration()    { ++m_nPassiveWaitIteration;    }
142             void    onPassiveWaitWakeup()       { ++m_nPassiveWaitWakeup;       }
143             void    onInvokeExclusive()         { ++m_nInvokeExclusive;         }
144             void    onWakeupByNotifying()       { ++m_nWakeupByNotifying;       }
145             void    onPassiveToCombiner()       { ++m_nPassiveToCombiner;       }
146
147             //@endcond
148         };
149
150         /// Flat combining dummy internal statistics
151         struct empty_stat
152         {
153             //@cond
154             void    onOperation()               const {}
155             void    onCombining()               const {}
156             void    onCompactPublicationList()  const {}
157             void    onDeactivatePubRecord()     const {}
158             void    onActivatePubRecord()       const {}
159             void    onCreatePubRecord()         const {}
160             void    onDeletePubRecord()         const {}
161             void    onPassiveWait()             const {}
162             void    onPassiveWaitIteration()    const {}
163             void    onPassiveWaitWakeup()       const {}
164             void    onInvokeExclusive()         const {}
165             void    onWakeupByNotifying()       const {}
166             void    onPassiveToCombiner()       const {}
167             //@endcond
168         };
169
170         /// Type traits of \ref kernel class
171         /**
172             You can define different type traits for \ref kernel
173             by specifying your struct based on \p %traits
174             or by using \ref make_traits metafunction.
175         */
176         struct traits
177         {
178             typedef cds::sync::spin             lock_type;  ///< Lock type
179             typedef cds::algo::flat_combining::wait_strategy::backoff< cds::backoff::delay_of<2>> wait_strategy; ///< Wait strategy
180             typedef CDS_DEFAULT_ALLOCATOR       allocator;  ///< Allocator used for TLS data (allocating \p publication_record derivatives)
181             typedef empty_stat                  stat;       ///< Internal statistics
182             typedef opt::v::relaxed_ordering  memory_model; ///< /// C++ memory ordering model
183         };
184
185         /// Metafunction converting option list to traits
186         /**
187             \p Options are:
188             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
189             - \p opt::wait_strategy - wait strategy, see \p wait_strategy namespace, default is \p wait_strategy::backoff.
190             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
191             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
192             - \p opt::memory_model - C++ memory ordering model.
193                 List of all available memory ordering see \p opt::memory_model.
194                 Default is \p cds::opt::v::relaxed_ordering
195         */
196         template <typename... Options>
197         struct make_traits {
198 #   ifdef CDS_DOXYGEN_INVOKED
199             typedef implementation_defined type ;   ///< Metafunction result
200 #   else
201             typedef typename cds::opt::make_options<
202                 typename cds::opt::find_type_traits< traits, Options... >::type
203                 ,Options...
204             >::type   type;
205 #   endif
206         };
207
208         /// The kernel of flat combining
209         /**
210             Template parameters:
211             - \p PublicationRecord - a type derived from \ref publication_record
212             - \p Traits - a type traits of flat combining, default is \p flat_combining::traits.
213                 \ref make_traits metafunction can be used to create type traits
214
215             The kernel object should be a member of a container class. The container cooperates with flat combining
216             kernel object. There are two ways to interact with the kernel:
217             - One-by-one processing the active records of the publication list. This mode provides by \p combine() function:
218               the container acquires its publication record by \p acquire_record(), fills its fields and calls
219               \p combine() function of its kernel object. If the current thread becomes a combiner, the kernel
220               calls \p fc_apply() function of the container for each active non-empty record. Then, the container
221               should release its publication record by \p release_record(). Only one pass through the publication
222               list is possible.
223             - Batch processing - \p batch_combine() function. It this mode the container obtains access
224               to entire publication list. This mode allows the container to perform an elimination, for example,
225               the stack can collide \p push() and \p pop() requests. The sequence of invocations is the following:
226               the container acquires its publication record by \p acquire_record(), fills its field and call
227               \p batch_combine() function of its kernel object. If the current thread becomes a combiner,
228               the kernel calls \p fc_process() function of the container passing two iterators pointing to
229               the begin and the end of publication list (see \ref iterator class). The iterators allow
230               multiple pass through active records of publication list. For each processed record the container
231               should call \p operation_done() function. On the end, the container should release
232               its record by \p release_record().
233         */
234         template <
235             typename PublicationRecord
236             ,typename Traits = traits
237         >
238         class kernel
239         {
240         public:
241             typedef Traits   traits;                               ///< Type traits
242             typedef typename traits::lock_type global_lock_type;   ///< Global lock type
243             typedef typename traits::wait_strategy wait_strategy;  ///< Wait strategy type
244             typedef typename traits::allocator allocator;          ///< Allocator type (used for allocating publication_record_type data)
245             typedef typename traits::stat      stat;               ///< Internal statistics
246             typedef typename traits::memory_model memory_model;    ///< C++ memory model
247
248             typedef typename wait_strategy::template make_publication_record<PublicationRecord>::type publication_record_type; ///< Publication record type
249
250         protected:
251             //@cond
252             typedef cds::details::Allocator< publication_record_type, allocator >   cxx11_allocator; ///< internal helper cds::details::Allocator
253             typedef std::lock_guard<global_lock_type> lock_guard;
254             //@endcond
255
256         protected:
257             atomics::atomic<unsigned int>  m_nCount;   ///< Total count of combining passes. Used as an age.
258             publication_record_type *   m_pHead;    ///< Head of publication list
259             boost::thread_specific_ptr< publication_record_type > m_pThreadRec;   ///< Thread-local publication record
260             mutable global_lock_type    m_Mutex;    ///< Global mutex
261             mutable stat                m_Stat;     ///< Internal statistics
262             unsigned int const          m_nCompactFactor;    ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
263             unsigned int const          m_nCombinePassCount; ///< Number of combining passes
264             wait_strategy               m_waitStrategy;      ///< Wait strategy
265
266         public:
267             /// Initializes the object
268             /**
269                 Compact factor = 1024
270
271                 Combiner pass count = 8
272             */
273             kernel()
274                 : kernel( 1024, 8 )
275             {}
276
277             /// Initializes the object
278             kernel(
279                 unsigned int nCompactFactor  ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
280                 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
281                 )
282                 : m_nCount(0)
283                 , m_pHead( nullptr )
284                 , m_pThreadRec( tls_cleanup )
285                 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 ))   // binary mask
286                 , m_nCombinePassCount( nCombinePassCount )
287             {
288                 init();
289             }
290
291             /// Destroys the objects and mark all publication records as inactive
292             ~kernel()
293             {
294                 // mark all publication record as detached
295                 for ( publication_record* p = m_pHead; p; ) {
296                     p->pOwner = nullptr;
297
298                     publication_record * pRec = p;
299                     p = p->pNext.load( memory_model::memory_order_relaxed );
300                     free_publication_record( static_cast<publication_record_type *>( pRec ));
301                 }
302             }
303
304             /// Gets publication list record for the current thread
305             /**
306                 If there is no publication record for the current thread
307                 the function allocates it.
308             */
309             publication_record_type * acquire_record()
310             {
311                 publication_record_type * pRec = m_pThreadRec.get();
312                 if ( !pRec ) {
313                     // Allocate new publication record
314                     pRec = cxx11_allocator().New();
315                     pRec->pOwner = reinterpret_cast<void *>( this );
316                     m_pThreadRec.reset( pRec );
317                     m_Stat.onCreatePubRecord();
318                 }
319
320                 if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
321                     publish( pRec );
322
323                 assert( pRec->op() == req_EmptyRecord );
324
325                 return pRec;
326             }
327
328             /// Marks publication record for the current thread as empty
329             void release_record( publication_record_type * pRec )
330             {
331                 assert( pRec->is_done());
332                 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
333             }
334
335             /// Trying to execute operation \p nOpId
336             /**
337                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
338                 \p owner is a container that is owner of flat combining kernel object.
339                 As a result the current thread can become a combiner or can wait for
340                 another combiner performs \p pRec operation.
341
342                 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
343                 for each active non-empty publication record.
344             */
345             template <class Container>
346             void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
347             {
348                 assert( nOpId >= req_Operation );
349                 assert( pRec );
350
351                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
352                 m_Stat.onOperation();
353
354                 try_combining( owner, pRec );
355             }
356
357             /// Trying to execute operation \p nOpId in batch-combine mode
358             /**
359                 \p pRec is the publication record acquiring by \p acquire_record() earlier.
360                 \p owner is a container that owns flat combining kernel object.
361                 As a result the current thread can become a combiner or can wait for
362                 another combiner performs \p pRec operation.
363
364                 If the thread becomes a combiner, the kernel calls \p owner.fc_process()
365                 giving the container the full access over publication list. This function
366                 is useful for an elimination technique if the container supports any kind of
367                 that. The container can perform multiple pass through publication list.
368
369                 \p owner.fc_process() has two arguments - forward iterators on begin and end of
370                 publication list, see \ref iterator class. For each processed record the container
371                 should call \p operation_done() function to mark the record as processed.
372
373                 On the end of \p %batch_combine the \p combine() function is called
374                 to process rest of publication records.
375             */
376             template <class Container>
377             void batch_combine( unsigned int nOpId, publication_record_type* pRec, Container& owner )
378             {
379                 assert( nOpId >= req_Operation );
380                 assert( pRec );
381
382                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
383                 m_Stat.onOperation();
384
385                 try_batch_combining( owner, pRec );
386             }
387
388             /// Invokes \p Func in exclusive mode
389             /**
390                 Some operation in flat combining containers should be called in exclusive mode
391                 i.e the current thread should become the combiner to process the operation.
392                 The typical example is \p empty() function.
393
394                 \p %invoke_exclusive() allows do that: the current thread becomes the combiner,
395                 invokes \p f exclusively but unlike a typical usage the thread does not process any pending request.
396                 Instead, after end of \p f call the current thread wakes up a pending thread if any.
397             */
398             template <typename Func>
399             void invoke_exclusive( Func f )
400             {
401                 {
402                     lock_guard l( m_Mutex );
403                     f();
404                 }
405                 m_waitStrategy.wakeup( *this );
406                 m_Stat.onInvokeExclusive();
407             }
408
409             /// Marks \p rec as executed
410             /**
411                 This function should be called by container if \p batch_combine mode is used.
412                 For usual combining (see \p combine()) this function is excess.
413             */
414             void operation_done( publication_record& rec )
415             {
416                 rec.nRequest.store( req_Response, memory_model::memory_order_release );
417                 m_waitStrategy.notify( *this, static_cast<publication_record_type&>( rec ));
418             }
419
420             /// Internal statistics
421             stat const& statistics() const
422             {
423                 return m_Stat;
424             }
425
426             //@cond
427             // For container classes based on flat combining
428             stat& internal_statistics() const
429             {
430                 return m_Stat;
431             }
432             //@endcond
433
434             /// Returns the compact factor
435             unsigned int compact_factor() const
436             {
437                 return m_nCompactFactor + 1;
438             }
439
440             /// Returns number of combining passes for combiner thread
441             unsigned int combine_pass_count() const
442             {
443                 return m_nCombinePassCount;
444             }
445
446         public:
447             /// Publication list iterator
448             /**
449                 Iterators are intended for batch processing by container's
450                 \p fc_process function.
451                 The iterator allows iterate through active publication list.
452             */
453             class iterator
454             {
455                 //@cond
456                 friend class kernel;
457                 publication_record_type * m_pRec;
458                 //@endcond
459
460             protected:
461                 //@cond
462                 iterator( publication_record_type * pRec )
463                     : m_pRec( pRec )
464                 {
465                     skip_inactive();
466                 }
467
468                 void skip_inactive()
469                 {
470                     while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
471                                     || m_pRec->op( memory_model::memory_order_relaxed) < req_Operation ))
472                     {
473                         m_pRec = static_cast<publication_record_type*>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
474                     }
475                 }
476                 //@endcond
477
478             public:
479                 /// Initializes an empty iterator object
480                 iterator()
481                     : m_pRec( nullptr )
482                 {}
483
484                 /// Copy ctor
485                 iterator( iterator const& src )
486                     : m_pRec( src.m_pRec )
487                 {}
488
489                 /// Pre-increment
490                 iterator& operator++()
491                 {
492                     assert( m_pRec );
493                     m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
494                     skip_inactive();
495                     return *this;
496                 }
497
498                 /// Post-increment
499                 iterator operator++(int)
500                 {
501                     assert( m_pRec );
502                     iterator it(*this);
503                     ++(*this);
504                     return it;
505                 }
506
507                 /// Dereference operator, can return \p nullptr
508                 publication_record_type* operator ->()
509                 {
510                     return m_pRec;
511                 }
512
513                 /// Dereference operator, the iterator should not be an end iterator
514                 publication_record_type& operator*()
515                 {
516                     assert( m_pRec );
517                     return *m_pRec;
518                 }
519
520                 /// Iterator equality
521                 friend bool operator==( iterator it1, iterator it2 )
522                 {
523                     return it1.m_pRec == it2.m_pRec;
524                 }
525
526                 /// Iterator inequality
527                 friend bool operator!=( iterator it1, iterator it2 )
528                 {
529                     return !( it1 == it2 );
530                 }
531             };
532
533             /// Returns an iterator to the first active publication record
534             iterator begin()    { return iterator(m_pHead); }
535
536             /// Returns an iterator to the end of publication list. Should not be dereferenced.
537             iterator end()      { return iterator(); }
538
539         public:
540             /// Gets current value of \p rec.nRequest
541             /**
542                 This function is intended for invoking from a wait strategy
543             */
544             int get_operation( publication_record& rec )
545             {
546                 return rec.op( memory_model::memory_order_acquire );
547             }
548
549             /// Wakes up any waiting thread
550             /**
551                 This function is intended for invoking from a wait strategy
552             */
553             void wakeup_any()
554             {
555                 publication_record* pRec = m_pHead;
556                 while ( pRec ) {
557                     if ( pRec->nState.load( memory_model::memory_order_acquire ) == active
558                       && pRec->op( memory_model::memory_order_acquire ) >= req_Operation )
559                     {
560                         m_waitStrategy.notify( *this, static_cast<publication_record_type&>( *pRec ));
561                         break;
562                     }
563                     pRec = pRec->pNext.load( memory_model::memory_order_acquire );
564                 }
565             }
566
567         private:
568             //@cond
569             static void tls_cleanup( publication_record_type* pRec )
570             {
571                 // Thread done
572                 // pRec that is TLS data should be excluded from publication list
573                 if ( pRec ) {
574                     if ( pRec->pOwner ) {
575                         // kernel is alive
576                         pRec->nState.store( removed, memory_model::memory_order_release );
577                     }
578                     else {
579                         // kernel already deleted
580                         free_publication_record( pRec );
581                     }
582                 }
583             }
584
585             static void free_publication_record( publication_record_type* pRec )
586             {
587                 cxx11_allocator().Delete( pRec );
588             }
589
590             void init()
591             {
592                 assert( m_pThreadRec.get() == nullptr );
593                 publication_record_type* pRec = cxx11_allocator().New();
594                 m_pHead = pRec;
595                 pRec->pOwner = this;
596                 m_pThreadRec.reset( pRec );
597                 m_Stat.onCreatePubRecord();
598             }
599
600             void publish( publication_record_type* pRec )
601             {
602                 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
603
604                 pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_release );
605                 pRec->nState.store( active, memory_model::memory_order_release );
606
607                 // Insert record to publication list
608                 if ( m_pHead != static_cast<publication_record *>(pRec)) {
609                     publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
610                     if ( p != static_cast<publication_record *>( pRec )) {
611                         do {
612                             pRec->pNext = p;
613                             // Failed CAS changes p
614                         } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
615                             memory_model::memory_order_release, atomics::memory_order_relaxed ));
616                         m_Stat.onActivatePubRecord();
617                     }
618                 }
619             }
620
621             void republish( publication_record_type* pRec )
622             {
623                 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
624                     // The record has been excluded from publication list. Reinsert it
625                     publish( pRec );
626                 }
627             }
628
629             template <class Container>
630             void try_combining( Container& owner, publication_record_type* pRec )
631             {
632                 if ( m_Mutex.try_lock()) {
633                     // The thread becomes a combiner
634                     lock_guard l( m_Mutex, std::adopt_lock_t());
635
636                     // The record pRec can be excluded from publication list. Re-publish it
637                     republish( pRec );
638
639                     combining( owner );
640                     assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
641                 }
642                 else {
643                     // There is another combiner, wait while it executes our request
644                     if ( !wait_for_combining( pRec )) {
645                         // The thread becomes a combiner
646                         lock_guard l( m_Mutex, std::adopt_lock_t());
647
648                         // The record pRec can be excluded from publication list. Re-publish it
649                         republish( pRec );
650
651                         combining( owner );
652                         assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
653                     }
654                 }
655             }
656
657             template <class Container>
658             void try_batch_combining( Container& owner, publication_record_type * pRec )
659             {
660                 if ( m_Mutex.try_lock()) {
661                     // The thread becomes a combiner
662                     lock_guard l( m_Mutex, std::adopt_lock_t());
663
664                     // The record pRec can be excluded from publication list. Re-publish it
665                     republish( pRec );
666
667                     batch_combining( owner );
668                     assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
669                 }
670                 else {
671                     // There is another combiner, wait while it executes our request
672                     if ( !wait_for_combining( pRec )) {
673                         // The thread becomes a combiner
674                         lock_guard l( m_Mutex, std::adopt_lock_t());
675
676                         // The record pRec can be excluded from publication list. Re-publish it
677                         republish( pRec );
678
679                         batch_combining( owner );
680                         assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
681                     }
682                 }
683             }
684
685             template <class Container>
686             void combining( Container& owner )
687             {
688                 // The thread is a combiner
689                 assert( !m_Mutex.try_lock());
690
691                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
692
693                 unsigned int nEmptyPassCount = 0;
694                 unsigned int nUsefulPassCount = 0;
695                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass ) {
696                     if ( combining_pass( owner, nCurAge ))
697                         ++nUsefulPassCount;
698                     else if ( ++nEmptyPassCount > nUsefulPassCount )
699                         break;
700                 }
701
702                 m_Stat.onCombining();
703                 if ( (nCurAge & m_nCompactFactor) == 0 )
704                     compact_list( nCurAge );
705             }
706
707             template <class Container>
708             bool combining_pass( Container& owner, unsigned int nCurAge )
709             {
710                 publication_record* pPrev = nullptr;
711                 publication_record* p = m_pHead;
712                 bool bOpDone = false;
713                 while ( p ) {
714                     switch ( p->nState.load( memory_model::memory_order_acquire )) {
715                         case active:
716                             if ( p->op() >= req_Operation ) {
717                                 p->nAge.store( nCurAge, memory_model::memory_order_release );
718                                 owner.fc_apply( static_cast<publication_record_type*>(p));
719                                 operation_done( *p );
720                                 bOpDone = true;
721                             }
722                             break;
723                         case inactive:
724                             // Only m_pHead can be inactive in the publication list
725                             assert( p == m_pHead );
726                             break;
727                         case removed:
728                             // The record should be removed
729                             p = unlink_and_delete_record( pPrev, p );
730                             continue;
731                         default:
732                             /// ??? That is impossible
733                             assert(false);
734                     }
735                     pPrev = p;
736                     p = p->pNext.load( memory_model::memory_order_acquire );
737                 }
738                 return bOpDone;
739             }
740
741             template <class Container>
742             void batch_combining( Container& owner )
743             {
744                 // The thread is a combiner
745                 assert( !m_Mutex.try_lock());
746
747                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
748
749                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
750                     owner.fc_process( begin(), end());
751
752                 combining_pass( owner, nCurAge );
753                 m_Stat.onCombining();
754                 if ( (nCurAge & m_nCompactFactor) == 0 )
755                     compact_list( nCurAge );
756             }
757
758             bool wait_for_combining( publication_record_type * pRec )
759             {
760                 m_waitStrategy.prepare( *pRec );
761                 m_Stat.onPassiveWait();
762
763                 while ( pRec->op( memory_model::memory_order_acquire ) != req_Response ) {
764                     // The record can be excluded from publication list. Reinsert it
765                     republish( pRec );
766
767                     m_Stat.onPassiveWaitIteration();
768
769                     // Wait while operation processing
770                     if ( m_waitStrategy.wait( *this, *pRec ))
771                         m_Stat.onWakeupByNotifying();
772
773                     if ( m_Mutex.try_lock()) {
774                         if ( pRec->op( memory_model::memory_order_acquire ) == req_Response ) {
775                             // Operation is done
776                             m_Mutex.unlock();
777
778                             // Wake up a pending threads
779                             m_waitStrategy.wakeup( *this );
780                             m_Stat.onPassiveWaitWakeup();
781
782                             break;
783                         }
784                         // The thread becomes a combiner
785                         m_Stat.onPassiveToCombiner();
786                         return false;
787                     }
788                 }
789                 return true;
790             }
791
792             void compact_list( unsigned int const nCurAge )
793             {
794                 // Thinning publication list
795                 publication_record * pPrev = nullptr;
796                 for ( publication_record * p = m_pHead; p; ) {
797                     if ( p->nState.load( memory_model::memory_order_acquire ) == active
798                       && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
799                     {
800                         if ( pPrev ) {
801                             publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
802                             if ( pPrev->pNext.compare_exchange_strong( p, pNext,
803                                 memory_model::memory_order_release, atomics::memory_order_relaxed ))
804                             {
805                                 p->nState.store( inactive, memory_model::memory_order_release );
806                                 p = pNext;
807                                 m_Stat.onDeactivatePubRecord();
808                                 continue;
809                             }
810                         }
811                     }
812                     pPrev = p;
813                     p = p->pNext.load( memory_model::memory_order_acquire );
814                 }
815
816                 m_Stat.onCompactPublicationList();
817             }
818
819             publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
820             {
821                 if ( pPrev ) {
822                     publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
823                     if ( pPrev->pNext.compare_exchange_strong( p, pNext,
824                         memory_model::memory_order_release, atomics::memory_order_relaxed ))
825                     {
826                         free_publication_record( static_cast<publication_record_type *>( p ));
827                         m_Stat.onDeletePubRecord();
828                     }
829                     return pNext;
830                 }
831                 else {
832                     m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
833                     free_publication_record( static_cast<publication_record_type *>( p ));
834                     m_Stat.onDeletePubRecord();
835                     return m_pHead;
836                 }
837             }
838             //@endcond
839         };
840
841         //@cond
842         class container
843         {
844         public:
845             template <typename PubRecord>
846             void fc_apply( PubRecord * )
847             {
848                 assert( false );
849             }
850
851             template <typename Iterator>
852             void fc_process( Iterator, Iterator )
853             {
854                 assert( false );
855             }
856         };
857         //@endcond
858
859     } // namespace flat_combining
860 }} // namespace cds::algo
861
862 #endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H