Removed trailing spaces
[libcds.git] / cds / algo / flat_combining / kernel.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H
32 #define CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H
33
34 #include <cds/algo/flat_combining/defs.h>
35 #include <cds/algo/flat_combining/wait_strategy.h>
36
37 #include <cds/sync/spinlock.h>
38 #include <cds/details/allocator.h>
39 #include <cds/opt/options.h>
40 #include <cds/algo/int_algo.h>
41
42 namespace cds { namespace algo {
43
44     /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
45     /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
46
47     /// Flat combining
48     /**
49         @anchor cds_flat_combining_description
50         Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
51         [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
52         The technique converts a sequential data structure to its concurrent implementation.
53         A few structures are added to the sequential implementation: a <i>global lock</i>,
54         a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
55         of a <i>publication list</i>. The publication list is a list of thread-local records
56         of a size proportional to the number of threads that are concurrently accessing the shared object.
57
58         Each thread \p t accessing the structure to perform an invocation of some method \p f()
59         on the shared object executes the following sequence of steps:
60         <ol>
61         <li>Write the invocation opcode and parameters (if any) of the method \p f() to be applied
62         sequentially to the shared object in the <i>request</i> field of your thread local publication
63         record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
64         be used to receive the response. If your thread local publication record is marked as active
65         continue to step 2, otherwise continue to step 5.</li>
66         <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
67         field waiting for a response to the invocation (one can add a yield at this point to allow other threads
68         on the same core to run). Once in a while while spinning check if the lock is still taken and that your
69         record is active (you may use any of \p wait_strategy instead of spinning). If your record is inactive proceed to step 5.
70         Once the response is available, reset the request field to null and return the response.</li>
71         <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
72         return to spinning in step 2.</li>
73         <li>Otherwise, you hold the lock and are a combiner.
74         <ul>
75             <li>Increment the combining pass count by one.</li>
76             <li>Execute a \p fc_apply() by traversing the publication list from the head,
77             combining all non-null method call invocations, setting the <i>age</i> of each of these records
78             to the current <i>count</i>, applying the combined method calls to the structure D, and returning
79             responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
80             <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
81             list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
82             by the head in the list), remove from the publication list all records whose <i>age</i> is
83             much smaller than the current <i>count</i>. This is done by removing the node and marking it
84             as inactive.</li>
85             <li>Release the lock.</li>
86         </ul>
87         <li>If you have no thread local publication record allocate one, marked as active. If you already
88         have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
89         the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
90         </ol>
91
92         As the test results show, the flat combining technique is suitable for non-intrusive containers
93         like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
94         less impressive results.
95
96         \ref cds_flat_combining_container "List of FC-based containers" in libcds.
97
98         \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
99     */
100     namespace flat_combining {
101
102         /// Flat combining internal statistics
103         template <typename Counter = cds::atomicity::event_counter >
104         struct stat
105         {
106             typedef Counter counter_type;   ///< Event counter type
107
108             counter_type    m_nOperationCount   ;   ///< How many operations have been performed
109             counter_type    m_nCombiningCount   ;   ///< Combining call count
110             counter_type    m_nCompactPublicationList; ///< Count of publication list compacting
111             counter_type    m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
112             counter_type    m_nActivatePubRecord;   ///< Count of publication record activating
113             counter_type    m_nPubRecordCreated ;   ///< Count of created publication records
114             counter_type    m_nPubRecordDeleted ;   ///< Count of deleted publication records
115             counter_type    m_nPassiveWaitCall;     ///< Count of passive waiting call (\p kernel::wait_for_combining())
116             counter_type    m_nPassiveWaitIteration;///< Count of iteration inside passive waiting
117             counter_type    m_nPassiveWaitWakeup;   ///< Count of forcing wake-up of passive wait cycle
118             counter_type    m_nInvokeExclusive;     ///< Count of call \p kernel::invoke_exclusive()
119             counter_type    m_nWakeupByNotifying;   ///< How many times the passive thread be waked up by a notification
120             counter_type    m_nPassiveToCombiner;   ///< How many times the passive thread becomes the combiner
121
122             /// Returns current combining factor
123             /**
124                 Combining factor is how many operations perform in one combine pass:
125                 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
126             */
127             double combining_factor() const
128             {
129                 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
130             }
131
132             //@cond
133             void    onOperation()               { ++m_nOperationCount;          }
134             void    onCombining()               { ++m_nCombiningCount;          }
135             void    onCompactPublicationList()  { ++m_nCompactPublicationList;  }
136             void    onDeactivatePubRecord()     { ++m_nDeactivatePubRecord;     }
137             void    onActivatePubRecord()       { ++m_nActivatePubRecord;       }
138             void    onCreatePubRecord()         { ++m_nPubRecordCreated;        }
139             void    onDeletePubRecord()         { ++m_nPubRecordDeleted;        }
140             void    onPassiveWait()             { ++m_nPassiveWaitCall;         }
141             void    onPassiveWaitIteration()    { ++m_nPassiveWaitIteration;    }
142             void    onPassiveWaitWakeup()       { ++m_nPassiveWaitWakeup;       }
143             void    onInvokeExclusive()         { ++m_nInvokeExclusive;         }
144             void    onWakeupByNotifying()       { ++m_nWakeupByNotifying;       }
145             void    onPassiveToCombiner()       { ++m_nPassiveToCombiner;       }
146
147             //@endcond
148         };
149
150         /// Flat combining dummy internal statistics
151         struct empty_stat
152         {
153             //@cond
154             void    onOperation()               const {}
155             void    onCombining()               const {}
156             void    onCompactPublicationList()  const {}
157             void    onDeactivatePubRecord()     const {}
158             void    onActivatePubRecord()       const {}
159             void    onCreatePubRecord()         const {}
160             void    onDeletePubRecord()         const {}
161             void    onPassiveWait()             const {}
162             void    onPassiveWaitIteration()    const {}
163             void    onPassiveWaitWakeup()       const {}
164             void    onInvokeExclusive()         const {}
165             void    onWakeupByNotifying()       const {}
166             void    onPassiveToCombiner()       const {}
167             //@endcond
168         };
169
170         /// Type traits of \ref kernel class
171         /**
172             You can define different type traits for \ref kernel
173             by specifying your struct based on \p %traits
174             or by using \ref make_traits metafunction.
175         */
176         struct traits
177         {
178             typedef cds::sync::spin             lock_type;  ///< Lock type
179             typedef cds::algo::flat_combining::wait_strategy::backoff< cds::backoff::delay_of<2>> wait_strategy; ///< Wait strategy
180             typedef CDS_DEFAULT_ALLOCATOR       allocator;  ///< Allocator used for TLS data (allocating \p publication_record derivatives)
181             typedef empty_stat                  stat;       ///< Internal statistics
182             typedef opt::v::relaxed_ordering  memory_model; ///< /// C++ memory ordering model
183         };
184
185         /// Metafunction converting option list to traits
186         /**
187             \p Options are:
188             - \p opt::lock_type - mutex type, default is \p cds::sync::spin
189             - \p opt::wait_strategy - wait strategy, see \p wait_strategy namespace, default is \p wait_strategy::backoff.
190             - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
191             - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
192             - \p opt::memory_model - C++ memory ordering model.
193                 List of all available memory ordering see \p opt::memory_model.
194                 Default is \p cds::opt::v::relaxed_ordering
195         */
196         template <typename... Options>
197         struct make_traits {
198 #   ifdef CDS_DOXYGEN_INVOKED
199             typedef implementation_defined type ;   ///< Metafunction result
200 #   else
201             typedef typename cds::opt::make_options<
202                 typename cds::opt::find_type_traits< traits, Options... >::type
203                 ,Options...
204             >::type   type;
205 #   endif
206         };
207
208         /// The kernel of flat combining
209         /**
210             Template parameters:
211             - \p PublicationRecord - a type derived from \ref publication_record
212             - \p Traits - a type traits of flat combining, default is \p flat_combining::traits.
213                 \ref make_traits metafunction can be used to create type traits
214
215             The kernel object should be a member of a container class. The container cooperates with flat combining
216             kernel object. There are two ways to interact with the kernel:
217             - One-by-one processing the active records of the publication list. This mode provides by \p combine() function:
218               the container acquires its publication record by \p acquire_record(), fills its fields and calls
219               \p combine() function of its kernel object. If the current thread becomes a combiner, the kernel
220               calls \p fc_apply() function of the container for each active non-empty record. Then, the container
221               should release its publication record by \p release_record(). Only one pass through the publication
222               list is possible.
223             - Batch processing - \p batch_combine() function. It this mode the container obtains access
224               to entire publication list. This mode allows the container to perform an elimination, for example,
225               the stack can collide \p push() and \p pop() requests. The sequence of invocations is the following:
226               the container acquires its publication record by \p acquire_record(), fills its field and call
227               \p batch_combine() function of its kernel object. If the current thread becomes a combiner,
228               the kernel calls \p fc_process() function of the container passing two iterators pointing to
229               the begin and the end of publication list (see \ref iterator class). The iterators allow
230               multiple pass through active records of publication list. For each processed record the container
231               should call \p operation_done() function. On the end, the container should release
232               its record by \p release_record().
233         */
234         template <
235             typename PublicationRecord
236             ,typename Traits = traits
237         >
238         class kernel
239         {
240         public:
241             typedef Traits   traits;                               ///< Type traits
242             typedef typename traits::lock_type global_lock_type;   ///< Global lock type
243             typedef typename traits::wait_strategy wait_strategy;  ///< Wait strategy type
244             typedef typename traits::allocator allocator;          ///< Allocator type (used for allocating publication_record_type data)
245             typedef typename traits::stat      stat;               ///< Internal statistics
246             typedef typename traits::memory_model memory_model;    ///< C++ memory model
247
248             typedef typename wait_strategy::template make_publication_record<PublicationRecord>::type publication_record_type; ///< Publication record type
249
250         protected:
251             //@cond
252             typedef cds::details::Allocator< publication_record_type, allocator >   cxx11_allocator; ///< internal helper cds::details::Allocator
253             typedef std::lock_guard<global_lock_type> lock_guard;
254             //@endcond
255
256         protected:
257             atomics::atomic<unsigned int>  m_nCount;    ///< Total count of combining passes. Used as an age.
258             publication_record_type*    m_pHead;        ///< Head of active publication list
259             publication_record_type*    m_pAllocatedHead; ///< Head of allocated publication list
260             boost::thread_specific_ptr< publication_record_type > m_pThreadRec;   ///< Thread-local publication record
261             mutable global_lock_type    m_Mutex;        ///< Global mutex
262             mutable stat                m_Stat;         ///< Internal statistics
263             unsigned int const          m_nCompactFactor;    ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
264             unsigned int const          m_nCombinePassCount; ///< Number of combining passes
265             wait_strategy               m_waitStrategy;      ///< Wait strategy
266
267         public:
268             /// Initializes the object
269             /**
270                 Compact factor = 1024
271
272                 Combiner pass count = 8
273             */
274             kernel()
275                 : kernel( 1024, 8 )
276             {}
277
278             /// Initializes the object
279             kernel(
280                 unsigned int nCompactFactor  ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
281                 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
282                 )
283                 : m_nCount(0)
284                 , m_pHead( nullptr )
285                 , m_pAllocatedHead( nullptr )
286                 , m_pThreadRec( tls_cleanup )
287                 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 ))   // binary mask
288                 , m_nCombinePassCount( nCombinePassCount )
289             {
290                 assert( m_pThreadRec.get() == nullptr );
291                 publication_record_type* pRec = cxx11_allocator().New();
292                 m_pAllocatedHead =
293                     m_pHead = pRec;
294                 m_pThreadRec.reset( pRec );
295                 m_Stat.onCreatePubRecord();
296             }
297
298             /// Destroys the object and all publication records
299             ~kernel()
300             {
301                 m_pThreadRec.reset();   // calls tls_cleanup()
302
303                 // delete all publication records
304                 for ( publication_record* p = m_pAllocatedHead; p; ) {
305                     publication_record * pRec = p;
306                     p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
307                     free_publication_record( static_cast<publication_record_type *>( pRec ));
308                 }
309             }
310
311             /// Gets publication list record for the current thread
312             /**
313                 If there is no publication record for the current thread
314                 the function allocates it.
315             */
316             publication_record_type * acquire_record()
317             {
318                 publication_record_type * pRec = m_pThreadRec.get();
319                 if ( !pRec ) {
320                     // Allocate new publication record
321                     pRec = cxx11_allocator().New();
322                     m_pThreadRec.reset( pRec );
323                     m_Stat.onCreatePubRecord();
324
325                     // Insert in allocated list
326                     assert( m_pAllocatedHead != nullptr );
327                     publication_record* p = m_pAllocatedHead->pNextAllocated.load( memory_model::memory_order_acquire );
328                     do {
329                         pRec->pNextAllocated.store( p, memory_model::memory_order_relaxed );
330                     } while ( !m_pAllocatedHead->pNextAllocated.compare_exchange_weak( p, pRec, memory_model::memory_order_release, atomics::memory_order_acquire ));
331
332                     publish( pRec );
333                 }
334                 else if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
335                     publish( pRec );
336
337                 assert( pRec->op() == req_EmptyRecord );
338
339                 return pRec;
340             }
341
342             /// Marks publication record for the current thread as empty
343             void release_record( publication_record_type * pRec )
344             {
345                 assert( pRec->is_done());
346                 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
347             }
348
349             /// Trying to execute operation \p nOpId
350             /**
351                 \p pRec is the publication record acquiring by \ref acquire_record earlier.
352                 \p owner is a container that is owner of flat combining kernel object.
353                 As a result the current thread can become a combiner or can wait for
354                 another combiner performs \p pRec operation.
355
356                 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
357                 for each active non-empty publication record.
358             */
359             template <class Container>
360             void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
361             {
362                 assert( nOpId >= req_Operation );
363                 assert( pRec );
364
365                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
366                 m_Stat.onOperation();
367
368                 try_combining( owner, pRec );
369             }
370
371             /// Trying to execute operation \p nOpId in batch-combine mode
372             /**
373                 \p pRec is the publication record acquiring by \p acquire_record() earlier.
374                 \p owner is a container that owns flat combining kernel object.
375                 As a result the current thread can become a combiner or can wait for
376                 another combiner performs \p pRec operation.
377
378                 If the thread becomes a combiner, the kernel calls \p owner.fc_process()
379                 giving the container the full access over publication list. This function
380                 is useful for an elimination technique if the container supports any kind of
381                 that. The container can perform multiple pass through publication list.
382
383                 \p owner.fc_process() has two arguments - forward iterators on begin and end of
384                 publication list, see \ref iterator class. For each processed record the container
385                 should call \p operation_done() function to mark the record as processed.
386
387                 On the end of \p %batch_combine the \p combine() function is called
388                 to process rest of publication records.
389             */
390             template <class Container>
391             void batch_combine( unsigned int nOpId, publication_record_type* pRec, Container& owner )
392             {
393                 assert( nOpId >= req_Operation );
394                 assert( pRec );
395
396                 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
397                 m_Stat.onOperation();
398
399                 try_batch_combining( owner, pRec );
400             }
401
402             /// Invokes \p Func in exclusive mode
403             /**
404                 Some operation in flat combining containers should be called in exclusive mode
405                 i.e the current thread should become the combiner to process the operation.
406                 The typical example is \p empty() function.
407
408                 \p %invoke_exclusive() allows do that: the current thread becomes the combiner,
409                 invokes \p f exclusively but unlike a typical usage the thread does not process any pending request.
410                 Instead, after end of \p f call the current thread wakes up a pending thread if any.
411             */
412             template <typename Func>
413             void invoke_exclusive( Func f )
414             {
415                 {
416                     lock_guard l( m_Mutex );
417                     f();
418                 }
419                 m_waitStrategy.wakeup( *this );
420                 m_Stat.onInvokeExclusive();
421             }
422
423             /// Marks \p rec as executed
424             /**
425                 This function should be called by container if \p batch_combine mode is used.
426                 For usual combining (see \p combine()) this function is excess.
427             */
428             void operation_done( publication_record& rec )
429             {
430                 rec.nRequest.store( req_Response, memory_model::memory_order_release );
431                 m_waitStrategy.notify( *this, static_cast<publication_record_type&>( rec ));
432             }
433
434             /// Internal statistics
435             stat const& statistics() const
436             {
437                 return m_Stat;
438             }
439
440             //@cond
441             // For container classes based on flat combining
442             stat& internal_statistics() const
443             {
444                 return m_Stat;
445             }
446             //@endcond
447
448             /// Returns the compact factor
449             unsigned int compact_factor() const
450             {
451                 return m_nCompactFactor + 1;
452             }
453
454             /// Returns number of combining passes for combiner thread
455             unsigned int combine_pass_count() const
456             {
457                 return m_nCombinePassCount;
458             }
459
460         public:
461             /// Publication list iterator
462             /**
463                 Iterators are intended for batch processing by container's
464                 \p fc_process function.
465                 The iterator allows iterate through active publication list.
466             */
467             class iterator
468             {
469                 //@cond
470                 friend class kernel;
471                 publication_record_type * m_pRec;
472                 //@endcond
473
474             protected:
475                 //@cond
476                 iterator( publication_record_type * pRec )
477                     : m_pRec( pRec )
478                 {
479                     skip_inactive();
480                 }
481
482                 void skip_inactive()
483                 {
484                     while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
485                                     || m_pRec->op( memory_model::memory_order_relaxed) < req_Operation ))
486                     {
487                         m_pRec = static_cast<publication_record_type*>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
488                     }
489                 }
490                 //@endcond
491
492             public:
493                 /// Initializes an empty iterator object
494                 iterator()
495                     : m_pRec( nullptr )
496                 {}
497
498                 /// Copy ctor
499                 iterator( iterator const& src )
500                     : m_pRec( src.m_pRec )
501                 {}
502
503                 /// Pre-increment
504                 iterator& operator++()
505                 {
506                     assert( m_pRec );
507                     m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
508                     skip_inactive();
509                     return *this;
510                 }
511
512                 /// Post-increment
513                 iterator operator++(int)
514                 {
515                     assert( m_pRec );
516                     iterator it(*this);
517                     ++(*this);
518                     return it;
519                 }
520
521                 /// Dereference operator, can return \p nullptr
522                 publication_record_type* operator ->()
523                 {
524                     return m_pRec;
525                 }
526
527                 /// Dereference operator, the iterator should not be an end iterator
528                 publication_record_type& operator*()
529                 {
530                     assert( m_pRec );
531                     return *m_pRec;
532                 }
533
534                 /// Iterator equality
535                 friend bool operator==( iterator it1, iterator it2 )
536                 {
537                     return it1.m_pRec == it2.m_pRec;
538                 }
539
540                 /// Iterator inequality
541                 friend bool operator!=( iterator it1, iterator it2 )
542                 {
543                     return !( it1 == it2 );
544                 }
545             };
546
547             /// Returns an iterator to the first active publication record
548             iterator begin()    { return iterator(m_pHead); }
549
550             /// Returns an iterator to the end of publication list. Should not be dereferenced.
551             iterator end()      { return iterator(); }
552
553         public:
554             /// Gets current value of \p rec.nRequest
555             /**
556                 This function is intended for invoking from a wait strategy
557             */
558             int get_operation( publication_record& rec )
559             {
560                 return rec.op( memory_model::memory_order_acquire );
561             }
562
563             /// Wakes up any waiting thread
564             /**
565                 This function is intended for invoking from a wait strategy
566             */
567             void wakeup_any()
568             {
569                 publication_record* pRec = m_pHead;
570                 while ( pRec ) {
571                     if ( pRec->nState.load( memory_model::memory_order_acquire ) == active
572                       && pRec->op( memory_model::memory_order_acquire ) >= req_Operation )
573                     {
574                         m_waitStrategy.notify( *this, static_cast<publication_record_type&>( *pRec ));
575                         break;
576                     }
577                     pRec = pRec->pNext.load( memory_model::memory_order_acquire );
578                 }
579             }
580
581         private:
582             //@cond
583             static void tls_cleanup( publication_record_type* pRec )
584             {
585                 // Thread done
586                 // pRec that is TLS data should be excluded from publication list
587                 pRec->nState.store( removed, memory_model::memory_order_release );
588             }
589
590             void free_publication_record( publication_record_type* pRec )
591             {
592                 cxx11_allocator().Delete( pRec );
593                 m_Stat.onDeletePubRecord();
594             }
595
596             void publish( publication_record_type* pRec )
597             {
598                 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
599
600                 pRec->nAge.store( m_nCount.load(memory_model::memory_order_relaxed), memory_model::memory_order_relaxed );
601                 pRec->nState.store( active, memory_model::memory_order_relaxed );
602
603                 // Insert record to publication list
604                 if ( m_pHead != static_cast<publication_record *>(pRec)) {
605                     publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
606                     if ( p != static_cast<publication_record *>( pRec )) {
607                         do {
608                             pRec->pNext.store( p, memory_model::memory_order_relaxed );
609                             // Failed CAS changes p
610                         } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
611                             memory_model::memory_order_release, atomics::memory_order_acquire ));
612                         m_Stat.onActivatePubRecord();
613                     }
614                 }
615             }
616
617             void republish( publication_record_type* pRec )
618             {
619                 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
620                     // The record has been excluded from publication list. Reinsert it
621                     publish( pRec );
622                 }
623             }
624
625             template <class Container>
626             void try_combining( Container& owner, publication_record_type* pRec )
627             {
628                 if ( m_Mutex.try_lock()) {
629                     // The thread becomes a combiner
630                     lock_guard l( m_Mutex, std::adopt_lock_t());
631
632                     // The record pRec can be excluded from publication list. Re-publish it
633                     republish( pRec );
634
635                     combining( owner );
636                     assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
637                 }
638                 else {
639                     // There is another combiner, wait while it executes our request
640                     if ( !wait_for_combining( pRec )) {
641                         // The thread becomes a combiner
642                         lock_guard l( m_Mutex, std::adopt_lock_t());
643
644                         // The record pRec can be excluded from publication list. Re-publish it
645                         republish( pRec );
646
647                         combining( owner );
648                         assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
649                     }
650                 }
651             }
652
653             template <class Container>
654             void try_batch_combining( Container& owner, publication_record_type * pRec )
655             {
656                 if ( m_Mutex.try_lock()) {
657                     // The thread becomes a combiner
658                     lock_guard l( m_Mutex, std::adopt_lock_t());
659
660                     // The record pRec can be excluded from publication list. Re-publish it
661                     republish( pRec );
662
663                     batch_combining( owner );
664                     assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
665                 }
666                 else {
667                     // There is another combiner, wait while it executes our request
668                     if ( !wait_for_combining( pRec )) {
669                         // The thread becomes a combiner
670                         lock_guard l( m_Mutex, std::adopt_lock_t());
671
672                         // The record pRec can be excluded from publication list. Re-publish it
673                         republish( pRec );
674
675                         batch_combining( owner );
676                         assert( pRec->op( memory_model::memory_order_relaxed ) == req_Response );
677                     }
678                 }
679             }
680
681             template <class Container>
682             void combining( Container& owner )
683             {
684                 // The thread is a combiner
685                 assert( !m_Mutex.try_lock());
686
687                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
688
689                 unsigned int nEmptyPassCount = 0;
690                 unsigned int nUsefulPassCount = 0;
691                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass ) {
692                     if ( combining_pass( owner, nCurAge ))
693                         ++nUsefulPassCount;
694                     else if ( ++nEmptyPassCount > nUsefulPassCount )
695                         break;
696                 }
697
698                 m_Stat.onCombining();
699                 if ( ( nCurAge & m_nCompactFactor ) == 0 )
700                     compact_list( nCurAge );
701             }
702
703             template <class Container>
704             bool combining_pass( Container& owner, unsigned int nCurAge )
705             {
706                 publication_record* p = m_pHead;
707                 bool bOpDone = false;
708                 while ( p ) {
709                     switch ( p->nState.load( memory_model::memory_order_acquire )) {
710                     case active:
711                         if ( p->op() >= req_Operation ) {
712                             p->nAge.store( nCurAge, memory_model::memory_order_relaxed );
713                             owner.fc_apply( static_cast<publication_record_type*>( p ));
714                             operation_done( *p );
715                             bOpDone = true;
716                         }
717                         break;
718                     case inactive:
719                         // Only m_pHead can be inactive in the publication list
720                         assert( p == m_pHead );
721                         break;
722                     case removed:
723                         // Such record will be removed on compacting phase
724                         break;
725                     default:
726                         /// ??? That is impossible
727                         assert( false );
728                     }
729                     p = p->pNext.load( memory_model::memory_order_acquire );
730                 }
731                 return bOpDone;
732             }
733
734             template <class Container>
735             void batch_combining( Container& owner )
736             {
737                 // The thread is a combiner
738                 assert( !m_Mutex.try_lock());
739
740                 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_relaxed ) + 1;
741
742                 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
743                     owner.fc_process( begin(), end());
744
745                 combining_pass( owner, nCurAge );
746                 m_Stat.onCombining();
747                 if ( ( nCurAge & m_nCompactFactor ) == 0 )
748                     compact_list( nCurAge );
749             }
750
751             bool wait_for_combining( publication_record_type * pRec )
752             {
753                 m_waitStrategy.prepare( *pRec );
754                 m_Stat.onPassiveWait();
755
756                 while ( pRec->op( memory_model::memory_order_acquire ) != req_Response ) {
757                     // The record can be excluded from publication list. Reinsert it
758                     republish( pRec );
759
760                     m_Stat.onPassiveWaitIteration();
761
762                     // Wait while operation processing
763                     if ( m_waitStrategy.wait( *this, *pRec ))
764                         m_Stat.onWakeupByNotifying();
765
766                     if ( m_Mutex.try_lock()) {
767                         if ( pRec->op( memory_model::memory_order_acquire ) == req_Response ) {
768                             // Operation is done
769                             m_Mutex.unlock();
770
771                             // Wake up a pending threads
772                             m_waitStrategy.wakeup( *this );
773                             m_Stat.onPassiveWaitWakeup();
774
775                             break;
776                         }
777                         // The thread becomes a combiner
778                         m_Stat.onPassiveToCombiner();
779                         return false;
780                     }
781                 }
782                 return true;
783             }
784
785             void compact_list( unsigned int nCurAge )
786             {
787                 // Compacts publication list
788                 // This function is called only by combiner thread
789
790             try_again:
791                 publication_record * pPrev = m_pHead;
792                 for ( publication_record * p = pPrev->pNext.load( memory_model::memory_order_acquire ); p; ) {
793                     switch ( p->nState.load( memory_model::memory_order_relaxed )) {
794                     case active:
795                         if ( p->nAge.load( memory_model::memory_order_relaxed ) + m_nCompactFactor < nCurAge )
796                         {
797                             publication_record * pNext = p->pNext.load( memory_model::memory_order_relaxed );
798                             if ( pPrev->pNext.compare_exchange_strong( p, pNext,
799                                 memory_model::memory_order_acquire, atomics::memory_order_relaxed ))
800                             {
801                                 p->nState.store( inactive, memory_model::memory_order_release );
802                                 p = pNext;
803                                 m_Stat.onDeactivatePubRecord();
804                                 continue;
805                             }
806                         }
807                         break;
808
809                     case removed:
810                         publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
811                         if ( cds_likely( pPrev->pNext.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed ))) {
812                             p = pNext;
813                             continue;
814                         }
815                         else {
816                             // CAS can be failed only in beginning of list
817                             assert( pPrev == m_pHead );
818                             goto try_again;
819                         }
820                     }
821                     pPrev = p;
822                     p = p->pNext.load( memory_model::memory_order_acquire );
823                 }
824
825                 // Iterate over allocated list to find removed records
826                 pPrev = m_pAllocatedHead;
827                 for ( publication_record * p = pPrev->pNextAllocated.load( memory_model::memory_order_acquire ); p; ) {
828                     if ( p->nState.load( memory_model::memory_order_relaxed ) == removed ) {
829                         publication_record * pNext = p->pNextAllocated.load( memory_model::memory_order_relaxed );
830                         if ( pPrev->pNextAllocated.compare_exchange_strong( p, pNext, memory_model::memory_order_acquire, atomics::memory_order_relaxed )) {
831                             free_publication_record( static_cast<publication_record_type *>( p ));
832                             p = pNext;
833                             continue;
834                         }
835                     }
836
837                     pPrev = p;
838                     p = p->pNextAllocated.load( memory_model::memory_order_relaxed );
839                 }
840
841                 m_Stat.onCompactPublicationList();
842             }
843             //@endcond
844         };
845
846         //@cond
847         class container
848         {
849         public:
850             template <typename PubRecord>
851             void fc_apply( PubRecord * )
852             {
853                 assert( false );
854             }
855
856             template <typename Iterator>
857             void fc_process( Iterator, Iterator )
858             {
859                 assert( false );
860             }
861         };
862         //@endcond
863
864     } // namespace flat_combining
865 }} // namespace cds::algo
866
867 #endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_KERNEL_H