3 #ifndef CDSLIB_ALGO_FLAT_COMBINING_H
4 #define CDSLIB_ALGO_FLAT_COMBINING_H
7 #include <cds/algo/atomic.h>
8 #include <cds/details/allocator.h>
9 #include <cds/algo/backoff_strategy.h>
10 #include <cds/sync/spinlock.h>
11 #include <cds/opt/options.h>
12 #include <cds/algo/int_algo.h>
13 #include <boost/thread/tss.hpp> // thread_specific_ptr
15 namespace cds { namespace algo {
17 /// @defgroup cds_flat_combining_intrusive Intrusive flat combining containers
18 /// @defgroup cds_flat_combining_container Non-intrusive flat combining containers
22 @anchor cds_flat_combining_description
23 Flat combining (FC) technique is invented by Hendler, Incze, Shavit and Tzafrir in their paper
24 [2010] <i>"Flat Combining and the Synchronization-Parallelism Tradeoff"</i>.
25 The technique converts a sequential data structure to its concurrent implementation.
26 A few structures are added to the sequential implementation: a <i>global lock</i>,
27 a <i>count</i> of the number of combining passes, and a pointer to the <i>head</i>
28 of a <i>publication list</i>. The publication list is a list of thread-local records
29 of a size proportional to the number of threads that are concurrently accessing the shared object.
31 Each thread \p t accessing the structure to perform an invocation of some method \p m
32 on the shared object executes the following sequence of steps:
34 <li>Write the invocation opcode and parameters (if any) of the method \p m to be applied
35 sequentially to the shared object in the <i>request</i> field of your thread local publication
36 record (there is no need to use a load-store memory barrier). The <i>request</i> field will later
37 be used to receive the response. If your thread local publication record is marked as active
38 continue to step 2, otherwise continue to step 5.</li>
39 <li>Check if the global lock is taken. If so (another thread is an active combiner), spin on the <i>request</i>
40 field waiting for a response to the invocation (one can add a yield at this point to allow other threads
41 on the same core to run). Once in a while while spinning check if the lock is still taken and that your
42 record is active. If your record is inactive proceed to step 5. Once the response is available,
43 reset the request field to null and return the response.</li>
44 <li>If the lock is not taken, attempt to acquire it and become a combiner. If you fail,
45 return to spinning in step 2.</li>
46 <li>Otherwise, you hold the lock and are a combiner.
48 <li>Increment the combining pass count by one.</li>
49 <li>Execute a \p fc_apply() by traversing the publication list from the head,
50 combining all nonnull method call invocations, setting the <i>age</i> of each of these records
51 to the current <i>count</i>, applying the combined method calls to the structure D, and returning
52 responses to all the invocations. This traversal is guaranteed to be wait-free.</li>
53 <li>If the <i>count</i> is such that a cleanup needs to be performed, traverse the publication
54 list from the <i>head</i>. Starting from the second item (we always leave the item pointed to
55 by the head in the list), remove from the publication list all records whose <i>age</i> is
56 much smaller than the current <i>count</i>. This is done by removing the node and marking it
58 <li>Release the lock.</li>
60 <li>If you have no thread local publication record allocate one, marked as active. If you already
61 have one marked as inactive, mark it as active. Execute a store-load memory barrier. Proceed to insert
62 the record into the list with a successful CAS to the <i>head</i>. Then proceed to step 1.</li>
65 As the test results show, the flat combining technique is suitable for non-intrusive containers
66 like stack, queue, deque. For intrusive concurrent containers the flat combining demonstrates
67 less impressive results.
69 \ref cds_flat_combining_container "List of FC-based containers" in libcds.
71 \ref cds_flat_combining_intrusive "List of intrusive FC-based containers" in libcds.
73 namespace flat_combining {
75 /// Special values of publication_record::nRequest
78 req_EmptyRecord, ///< Publication record is empty
79 req_Response, ///< Operation is done
81 req_Operation ///< First operation id for derived classes
84 /// publication_record state
86 inactive, ///< Record is inactive
87 active, ///< Record is active
88 removed ///< Record should be removed
91 /// Record of publication list
93 Each data structure based on flat combining contains a class derived from \p %publication_record
95 struct publication_record {
96 atomics::atomic<unsigned int> nRequest; ///< Request field (depends on data structure)
97 atomics::atomic<unsigned int> nState; ///< Record state: inactive, active, removed
98 atomics::atomic<unsigned int> nAge; ///< Age of the record
99 atomics::atomic<publication_record *> pNext; ///< Next record in publication list
100 void * pOwner; ///< [internal data] Pointer to \ref kernel object that manages the publication list
102 /// Initializes publication record
104 : nRequest( req_EmptyRecord )
111 /// Returns the value of \p nRequest field
112 unsigned int op() const
114 return nRequest.load( atomics::memory_order_relaxed );
117 /// Checks if the operation is done
120 return nRequest.load( atomics::memory_order_relaxed ) == req_Response;
124 /// Flat combining internal statistics
125 template <typename Counter = cds::atomicity::event_counter >
128 typedef Counter counter_type; ///< Event counter type
130 counter_type m_nOperationCount ; ///< How many operations have been performed
131 counter_type m_nCombiningCount ; ///< Combining call count
132 counter_type m_nCompactPublicationList; ///< Count of publication list compacting
133 counter_type m_nDeactivatePubRecord; ///< How many publication records were deactivated during compacting
134 counter_type m_nActivatePubRecord; ///< Count of publication record activating
135 counter_type m_nPubRecordCreated ; ///< Count of created publication records
136 counter_type m_nPubRecordDeteted ; ///< Count of deleted publication records
137 counter_type m_nAcquirePubRecCount; ///< Count of acquiring publication record
138 counter_type m_nReleasePubRecCount; ///< Count on releasing publication record
140 /// Returns current combining factor
142 Combining factor is how many operations perform in one combine pass:
143 <tt>combining_factor := m_nOperationCount / m_nCombiningCount</tt>
145 double combining_factor() const
147 return m_nCombiningCount.get() ? double( m_nOperationCount.get()) / m_nCombiningCount.get() : 0.0;
151 void onOperation() { ++m_nOperationCount; }
152 void onCombining() { ++m_nCombiningCount; }
153 void onCompactPublicationList() { ++m_nCompactPublicationList; }
154 void onDeactivatePubRecord() { ++m_nDeactivatePubRecord; }
155 void onActivatPubRecord() { ++m_nActivatePubRecord; }
156 void onCreatePubRecord() { ++m_nPubRecordCreated; }
157 void onDeletePubRecord() { ++m_nPubRecordDeteted; }
158 void onAcquirePubRecord() { ++m_nAcquirePubRecCount; }
159 void onReleasePubRecord() { ++m_nReleasePubRecCount; }
163 /// Flat combining dummy internal statistics
167 void onOperation() {}
168 void onCombining() {}
169 void onCompactPublicationList() {}
170 void onDeactivatePubRecord() {}
171 void onActivatPubRecord() {}
172 void onCreatePubRecord() {}
173 void onDeletePubRecord() {}
174 void onAcquirePubRecord() {}
175 void onReleasePubRecord() {}
179 /// Type traits of \ref kernel class
181 You can define different type traits for \ref kernel
182 by specifying your struct based on \p %traits
183 or by using \ref make_traits metafunction.
187 typedef cds::sync::spin lock_type; ///< Lock type
188 typedef cds::backoff::delay_of<2> back_off; ///< Back-off strategy
189 typedef CDS_DEFAULT_ALLOCATOR allocator; ///< Allocator used for TLS data (allocating publication_record derivatives)
190 typedef empty_stat stat; ///< Internal statistics
191 typedef opt::v::relaxed_ordering memory_model; ///< /// C++ memory ordering model
194 /// Metafunction converting option list to traits
197 - \p opt::lock_type - mutex type, default is \p cds::sync::spin
198 - \p opt::back_off - back-off strategy, defalt is \p cds::backoff::delay_of<2>
199 - \p opt::allocator - allocator type, default is \ref CDS_DEFAULT_ALLOCATOR
200 - \p opt::stat - internal statistics, possible type: \ref stat, \ref empty_stat (the default)
201 - \p opt::memory_model - C++ memory ordering model.
202 List of all available memory ordering see \p opt::memory_model.
203 Default is \p cds::opt::v::relaxed_ordering
205 template <typename... Options>
207 # ifdef CDS_DOXYGEN_INVOKED
208 typedef implementation_defined type ; ///< Metafunction result
210 typedef typename cds::opt::make_options<
211 typename cds::opt::find_type_traits< traits, Options... >::type
217 /// The kernel of flat combining
220 - \p PublicationRecord - a type derived from \ref publication_record
221 - \p Traits - a type traits of flat combining, default is \p flat_combining::traits.
222 \ref make_traits metafunction can be used to create type traits
224 The kernel object should be a member of a container class. The container cooperates with flat combining
225 kernel object. There are two ways to interact with the kernel:
226 - One-by-one processing the active records of the publication list. This mode provides by \p combine() function:
227 the container acquires its publication record by \p acquire_record(), fills its fields and calls
228 \p combine() function of its kernel object. If the current thread becomes a combiner, the kernel
229 calls \p fc_apply() function of the container for each active non-empty record. Then, the container
230 should release its publication record by \p release_record(). Only one pass through the publication
232 - Batch processing - \p batch_combine() function. It this mode the container obtains access
233 to entire publication list. This mode allows the container to perform an elimination, for example,
234 the stack can collide \p push() and \p pop() requests. The sequence of invocations is the following:
235 the container acquires its publication record by \p acquire_record(), fills its field and call
236 \p batch_combine() function of its kernel object. If the current thread becomes a combiner,
237 the kernel calls \p fc_process() function of the container passing two iterators pointing to
238 the begin and the end of publication list (see \ref iterator class). The iterators allow
239 multiple pass through active records of publication list. For each processed record the container
240 should call \p operation_done() function. On the end, the container should release
241 its record by \p release_record().
244 typename PublicationRecord
245 ,typename Traits = traits
250 typedef PublicationRecord publication_record_type; ///< publication record type
251 typedef Traits traits; ///< Type traits
252 typedef typename traits::lock_type global_lock_type; ///< Global lock type
253 typedef typename traits::back_off back_off; ///< back-off strategy type
254 typedef typename traits::allocator allocator; ///< Allocator type (used for allocating publication_record_type data)
255 typedef typename traits::stat stat; ///< Internal statistics
256 typedef typename traits::memory_model memory_model; ///< C++ memory model
260 typedef cds::details::Allocator< publication_record_type, allocator > cxx11_allocator; ///< internal helper cds::details::Allocator
261 typedef std::lock_guard<global_lock_type> lock_guard;
265 atomics::atomic<unsigned int> m_nCount; ///< Total count of combining passes. Used as an age.
266 publication_record_type * m_pHead; ///< Head of publication list
267 boost::thread_specific_ptr< publication_record_type > m_pThreadRec; ///< Thread-local publication record
268 mutable global_lock_type m_Mutex; ///< Global mutex
269 mutable stat m_Stat; ///< Internal statistics
270 unsigned int const m_nCompactFactor; ///< Publication list compacting factor (the list will be compacted through \p %m_nCompactFactor combining passes)
271 unsigned int const m_nCombinePassCount; ///< Number of combining passes
274 /// Initializes the object
278 Combiner pass count = 8
283 , m_pThreadRec( tls_cleanup )
284 , m_nCompactFactor( 64 - 1 ) // binary mask
285 , m_nCombinePassCount( 8 )
290 /// Initializes the object
292 unsigned int nCompactFactor ///< Publication list compacting factor (the list will be compacted through \p nCompactFactor combining passes)
293 ,unsigned int nCombinePassCount ///< Number of combining passes for combiner thread
297 , m_pThreadRec( tls_cleanup )
298 , m_nCompactFactor( (unsigned int)( cds::beans::ceil2( nCompactFactor ) - 1 )) // binary mask
299 , m_nCombinePassCount( nCombinePassCount )
304 /// Destroys the objects and mark all publication records as inactive
307 // mark all publication record as detached
308 for ( publication_record * p = m_pHead; p; ) {
311 publication_record * pRec = p;
312 p = p->pNext.load( memory_model::memory_order_relaxed );
313 if ( pRec->nState.load( memory_model::memory_order_relaxed ) == removed )
314 free_publication_record( static_cast<publication_record_type *>( pRec ));
318 /// Gets publication list record for the current thread
320 If there is no publication record for the current thread
321 the function allocates it.
323 publication_record_type * acquire_record()
325 publication_record_type * pRec = m_pThreadRec.get();
327 // Allocate new publication record
328 pRec = cxx11_allocator().New();
329 pRec->pOwner = reinterpret_cast<void *>( this );
330 m_pThreadRec.reset( pRec );
331 m_Stat.onCreatePubRecord();
334 if ( pRec->nState.load( memory_model::memory_order_acquire ) != active )
337 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_EmptyRecord );
339 m_Stat.onAcquirePubRecord();
343 /// Marks publication record for the current thread as empty
344 void release_record( publication_record_type * pRec )
346 assert( pRec->is_done() );
347 pRec->nRequest.store( req_EmptyRecord, memory_model::memory_order_release );
348 m_Stat.onReleasePubRecord();
351 /// Trying to execute operation \p nOpId
353 \p pRec is the publication record acquiring by \ref acquire_record earlier.
354 \p owner is a container that is owner of flat combining kernel object.
355 As a result the current thread can become a combiner or can wait for
356 another combiner performs \p pRec operation.
358 If the thread becomes a combiner, the kernel calls \p owner.fc_apply
359 for each active non-empty publication record.
361 template <class Container>
362 void combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
364 assert( nOpId >= req_Operation );
366 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
367 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
369 m_Stat.onOperation();
371 try_combining( owner, pRec );
374 /// Trying to execute operation \p nOpId in batch-combine mode
376 \p pRec is the publication record acquiring by \ref acquire_record earlier.
377 \p owner is a container that owns flat combining kernel object.
378 As a result the current thread can become a combiner or can wait for
379 another combiner performs \p pRec operation.
381 If the thread becomes a combiner, the kernel calls \p owner.fc_process
382 giving the container the full access over publication list. This function
383 is useful for an elimination technique if the container supports any kind of
384 that. The container can perform multiple pass through publication list.
386 \p owner.fc_process has two arguments - forward iterators on begin and end of
387 publication list, see \ref iterator class. For each processed record the container
388 should call \ref operation_done function to mark the record as processed.
390 On the end of \p %batch_combine the \ref combine function is called
391 to process rest of publication records.
393 template <class Container>
394 void batch_combine( unsigned int nOpId, publication_record_type * pRec, Container& owner )
396 assert( nOpId >= req_Operation );
398 //assert( pRec->nState.load( memory_model::memory_order_relaxed ) == active );
399 pRec->nRequest.store( nOpId, memory_model::memory_order_release );
401 m_Stat.onOperation();
403 try_batch_combining( owner, pRec );
406 /// Waits for end of combining
407 void wait_while_combining() const
409 lock_guard l( m_Mutex );
412 /// Marks \p rec as executed
414 This function should be called by container if batch_combine mode is used.
415 For usual combining (see \ref combine) this function is excess.
417 void operation_done( publication_record& rec )
419 rec.nRequest.store( req_Response, memory_model::memory_order_release );
422 /// Internal statistics
423 stat const& statistics() const
429 // For container classes based on flat combining
430 stat& internal_statistics() const
436 /// Returns the compact factor
437 unsigned int compact_factor() const
439 return m_nCompactFactor + 1;
442 /// Returns number of combining passes for combiner thread
443 unsigned int combine_pass_count() const
445 return m_nCombinePassCount;
449 /// Publication list iterator
451 Iterators are intended for batch processing by container's
452 \p fc_process function.
453 The iterator allows iterate through active publication list.
459 publication_record_type * m_pRec;
464 iterator( publication_record_type * pRec )
472 while ( m_pRec && (m_pRec->nState.load( memory_model::memory_order_acquire ) != active
473 || m_pRec->nRequest.load( memory_model::memory_order_relaxed) < req_Operation ))
475 m_pRec = static_cast<publication_record_type *>(m_pRec->pNext.load( memory_model::memory_order_acquire ));
481 /// Initializes an empty iterator object
487 iterator( iterator const& src )
488 : m_pRec( src.m_pRec )
492 iterator& operator++()
495 m_pRec = static_cast<publication_record_type *>( m_pRec->pNext.load( memory_model::memory_order_acquire ));
501 iterator operator++(int)
509 /// Dereference operator, can return \p nullptr
510 publication_record_type * operator ->()
515 /// Dereference operator, the iterator should not be an end iterator
516 publication_record_type& operator*()
522 /// Iterator equality
523 friend bool operator==( iterator it1, iterator it2 )
525 return it1.m_pRec == it2.m_pRec;
528 /// Iterator inequality
529 friend bool operator!=( iterator it1, iterator it2 )
531 return !( it1 == it2 );
535 /// Returns an iterator to the first active publication record
536 iterator begin() { return iterator(m_pHead); }
538 /// Returns an iterator to the end of publication list. Should not be dereferenced.
539 iterator end() { return iterator(); }
543 static void tls_cleanup( publication_record_type * pRec )
546 // pRec that is TLS data should be excluded from publication list
548 if ( pRec->pOwner && pRec->nState.load(memory_model::memory_order_relaxed) == active ) {
549 // record is active and kernel is alive
550 unsigned int nState = active;
551 pRec->nState.compare_exchange_strong( nState, removed, memory_model::memory_order_release, atomics::memory_order_relaxed );
554 // record is not in publication list or kernel already deleted
555 free_publication_record( pRec );
560 static void free_publication_record( publication_record_type * pRec )
562 cxx11_allocator().Delete( pRec );
567 assert( m_pThreadRec.get() == nullptr );
568 publication_record_type * pRec = cxx11_allocator().New();
571 m_pThreadRec.reset( pRec );
572 m_Stat.onCreatePubRecord();
575 void publish( publication_record_type * pRec )
577 assert( pRec->nState.load( memory_model::memory_order_relaxed ) == inactive );
579 pRec->nAge.store( m_nCount.load(memory_model::memory_order_acquire), memory_model::memory_order_release );
580 pRec->nState.store( active, memory_model::memory_order_release );
582 // Insert record to publication list
583 if ( m_pHead != static_cast<publication_record *>(pRec) ) {
584 publication_record * p = m_pHead->pNext.load(memory_model::memory_order_relaxed);
585 if ( p != static_cast<publication_record *>( pRec )) {
588 // Failed CAS changes p
589 } while ( !m_pHead->pNext.compare_exchange_weak( p, static_cast<publication_record *>(pRec),
590 memory_model::memory_order_release, atomics::memory_order_relaxed ));
591 m_Stat.onActivatPubRecord();
596 void republish( publication_record_type * pRec )
598 if ( pRec->nState.load( memory_model::memory_order_relaxed ) != active ) {
599 // The record has been excluded from publication list. Reinsert it
604 template <class Container>
605 void try_combining( Container& owner, publication_record_type * pRec )
607 if ( m_Mutex.try_lock() ) {
608 // The thread becomes a combiner
609 lock_guard l( m_Mutex, std::adopt_lock_t() );
611 // The record pRec can be excluded from publication list. Re-publish it
615 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
618 // There is another combiner, wait while it executes our request
619 if ( !wait_for_combining( pRec ) ) {
620 // The thread becomes a combiner
621 lock_guard l( m_Mutex, std::adopt_lock_t() );
623 // The record pRec can be excluded from publication list. Re-publish it
627 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
632 template <class Container>
633 void try_batch_combining( Container& owner, publication_record_type * pRec )
635 if ( m_Mutex.try_lock() ) {
636 // The thread becomes a combiner
637 lock_guard l( m_Mutex, std::adopt_lock_t() );
639 // The record pRec can be excluded from publication list. Re-publish it
642 batch_combining( owner );
643 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
646 // There is another combiner, wait while it executes our request
647 if ( !wait_for_combining( pRec ) ) {
648 // The thread becomes a combiner
649 lock_guard l( m_Mutex, std::adopt_lock_t() );
651 // The record pRec can be excluded from publication list. Re-publish it
654 batch_combining( owner );
655 assert( pRec->nRequest.load( memory_model::memory_order_relaxed ) == req_Response );
660 template <class Container>
661 void combining( Container& owner )
663 // The thread is a combiner
664 assert( !m_Mutex.try_lock() );
666 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_release ) + 1;
668 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
669 if ( !combining_pass( owner, nCurAge ))
672 m_Stat.onCombining();
673 if ( (nCurAge & m_nCompactFactor) == 0 )
674 compact_list( nCurAge );
677 template <class Container>
678 bool combining_pass( Container& owner, unsigned int nCurAge )
680 publication_record * pPrev = nullptr;
681 publication_record * p = m_pHead;
682 bool bOpDone = false;
684 switch ( p->nState.load( memory_model::memory_order_acquire )) {
686 if ( p->op() >= req_Operation ) {
687 p->nAge.store( nCurAge, memory_model::memory_order_release );
688 owner.fc_apply( static_cast<publication_record_type *>(p) );
689 operation_done( *p );
694 // Only m_pHead can be inactive in the publication list
695 assert( p == m_pHead );
698 // The record should be removed
699 p = unlink_and_delete_record( pPrev, p );
702 /// ??? That is impossible
706 p = p->pNext.load( memory_model::memory_order_acquire );
711 template <class Container>
712 void batch_combining( Container& owner )
714 // The thread is a combiner
715 assert( !m_Mutex.try_lock() );
717 unsigned int const nCurAge = m_nCount.fetch_add( 1, memory_model::memory_order_release ) + 1;
719 for ( unsigned int nPass = 0; nPass < m_nCombinePassCount; ++nPass )
720 owner.fc_process( begin(), end() );
722 combining_pass( owner, nCurAge );
723 m_Stat.onCombining();
724 if ( (nCurAge & m_nCompactFactor) == 0 )
725 compact_list( nCurAge );
728 bool wait_for_combining( publication_record_type * pRec )
731 while ( pRec->nRequest.load( memory_model::memory_order_acquire ) != req_Response ) {
733 // The record can be excluded from publication list. Reinsert it
738 if ( m_Mutex.try_lock() ) {
739 if ( pRec->nRequest.load( memory_model::memory_order_acquire ) == req_Response ) {
743 // The thread becomes a combiner
750 void compact_list( unsigned int const nCurAge )
752 // Thinning publication list
753 publication_record * pPrev = nullptr;
754 for ( publication_record * p = m_pHead; p; ) {
755 if ( p->nState.load( memory_model::memory_order_acquire ) == active
756 && p->nAge.load( memory_model::memory_order_acquire ) + m_nCompactFactor < nCurAge )
759 publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
760 if ( pPrev->pNext.compare_exchange_strong( p, pNext,
761 memory_model::memory_order_release, atomics::memory_order_relaxed ))
763 p->nState.store( inactive, memory_model::memory_order_release );
765 m_Stat.onDeactivatePubRecord();
771 p = p->pNext.load( memory_model::memory_order_acquire );
774 m_Stat.onCompactPublicationList();
777 publication_record * unlink_and_delete_record( publication_record * pPrev, publication_record * p )
780 publication_record * pNext = p->pNext.load( memory_model::memory_order_acquire );
781 if ( pPrev->pNext.compare_exchange_strong( p, pNext,
782 memory_model::memory_order_release, atomics::memory_order_relaxed ))
784 free_publication_record( static_cast<publication_record_type *>( p ));
785 m_Stat.onDeletePubRecord();
790 m_pHead = static_cast<publication_record_type *>( p->pNext.load( memory_model::memory_order_acquire ));
791 free_publication_record( static_cast<publication_record_type *>( p ));
792 m_Stat.onDeletePubRecord();
803 template <typename PubRecord>
804 void fc_apply( PubRecord * )
809 template <typename Iterator>
810 void fc_process( Iterator, Iterator )
817 } // namespace flat_combining
818 }} // namespace cds::algo
820 #endif // #ifndef CDSLIB_ALGO_FLAT_COMBINING_H