Merge branch 'ldionne-ldionne-cmake' into dev
[libcds.git] / cds / algo / flat_combining / wait_strategy.h
1 /*
2     This file is a part of libcds - Concurrent Data Structures library
3
4     (C) Copyright Maxim Khizhinsky (libcds.dev@gmail.com) 2006-2017
5
6     Source code repo: http://github.com/khizmax/libcds/
7     Download: http://sourceforge.net/projects/libcds/files/
8
9     Redistribution and use in source and binary forms, with or without
10     modification, are permitted provided that the following conditions are met:
11
12     * Redistributions of source code must retain the above copyright notice, this
13       list of conditions and the following disclaimer.
14
15     * Redistributions in binary form must reproduce the above copyright notice,
16       this list of conditions and the following disclaimer in the documentation
17       and/or other materials provided with the distribution.
18
19     THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
20     AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
21     IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
22     DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
23     FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
24     DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
25     SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
26     CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
27     OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
28     OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
29 */
30
31 #ifndef CDSLIB_ALGO_FLAT_COMBINING_WAIT_STRATEGY_H
32 #define CDSLIB_ALGO_FLAT_COMBINING_WAIT_STRATEGY_H
33
34 #include <cds/algo/flat_combining/defs.h>
35 #include <cds/algo/backoff_strategy.h>
36 #include <mutex>
37 #include <condition_variable>
38 #include <boost/thread/tss.hpp>  // thread_specific_ptr
39
40
41 namespace cds { namespace opt {
42
43     /// Wait strategy option for \p flat_combining::kernel
44     template <typename Strategy>
45     struct wait_strategy {
46         //@cond
47         template <typename Base> struct pack: public Base
48         {
49             typedef Strategy wait_strategy;
50         };
51         //@endcond
52     };
53
54 }} // namespace cds::opt
55
56 namespace cds { namespace algo { namespace flat_combining {
57
58     /// Wait strategies for \p flat_combining technique
59     /**
60         Wait strategy specifies how a thread waits until its request is performed by the combiner.
61         See \p wait_strategy::empty wait strategy to explain the interface.
62     */
63     namespace wait_strategy {
64
65         /// Empty wait strategy
66         /**
67             Empty wait strategy is just spinning on request field.
68             All functions are empty.
69         */
70         struct empty
71         {
72             /// Metafunction for defining a publication record for flat combining technique
73             /**
74                 Any wait strategy may expand the publication record for storing
75                 its own private data.
76                 \p PublicationRecord is the type specified by \p flat_combining::kernel.
77                 - If the strategy has no thread-private data, it should typedef \p PublicationRecord
78                   as a return \p type of metafunction.
79                 - Otherwise, if the strategy wants to store anything in thread-local data,
80                   it should expand \p PublicationRecord, for example:
81                   \code
82                   template <typename PublicationRecord>
83                   struct make_publication_record {
84                     struct type: public PublicationRecord
85                     {
86                         int strategy_data;
87                     };
88                   };
89                   \endcode
90             */
91             template <typename PublicationRecord>
92             struct make_publication_record {
93                 typedef PublicationRecord type; ///< Metafunction result
94             };
95
96             /// Prepares the strategy
97             /**
98                 This function is called before enter to waiting cycle.
99                 Some strategies need to prepare its thread-local data in \p rec.
100
101                 \p PublicationRecord is thread's publication record of type \p make_publication_record::type
102             */
103             template <typename PublicationRecord>
104             void prepare( PublicationRecord& rec )
105             {
106                 CDS_UNUSED( rec );
107             }
108
109             /// Waits for the combiner
110             /**
111                 The thread calls this function to wait for the combiner process
112                 the request.
113                 The function returns \p true if the thread was waked up by the combiner,
114                 otherwise it should return \p false.
115
116                 \p FCKernel is a \p flat_combining::kernel object,
117                 \p PublicationRecord is thread's publication record of type \p make_publication_record::type
118             */
119             template <typename FCKernel, typename PublicationRecord>
120             bool wait( FCKernel& fc, PublicationRecord& rec )
121             {
122                 CDS_UNUSED( fc );
123                 CDS_UNUSED( rec );
124                 return false;
125             }
126
127             /// Wakes up the thread
128             /**
129                 The combiner calls \p %notify() when it has been processed the request.
130
131                 \p FCKernel is a \p flat_combining::kernel object,
132                 \p PublicationRecord is thread's publication record of type \p make_publication_record::type
133             */
134             template <typename FCKernel, typename PublicationRecord>
135             void notify( FCKernel& fc, PublicationRecord& rec )
136             {
137                 CDS_UNUSED( fc );
138                 CDS_UNUSED( rec );
139             }
140
141             /// Moves control to other thread
142             /**
143                 This function is called when the thread becomes the combiner
144                 but the request of the thread is already processed.
145                 The strategy may call \p fc.wakeup_any() instructs the kernel
146                 to wake up any pending thread.
147
148                 \p FCKernel is a \p flat_combining::kernel object,
149             */
150             template <typename FCKernel>
151             void wakeup( FCKernel& fc )
152             {
153                 CDS_UNUSED( fc );
154             }
155         };
156
157         /// Back-off wait strategy
158         /**
159             Template argument \p Backoff specifies back-off strategy, default is cds::backoff::delay_of<2>
160         */
161         template <typename BackOff = cds::backoff::delay_of<2>>
162         struct backoff
163         {
164             typedef BackOff back_off;   ///< Back-off strategy
165
166             /// Incorporates back-off strategy into publication record
167             template <typename PublicationRecord>
168             struct make_publication_record
169             {
170                 //@cond
171                 struct type: public PublicationRecord
172                 {
173                     back_off bkoff;
174                 };
175                 //@endcond
176             };
177
178             /// Resets back-off strategy in \p rec
179             template <typename PublicationRecord>
180             void prepare( PublicationRecord& rec )
181             {
182                 rec.bkoff.reset();
183             }
184
185             /// Calls back-off strategy
186             template <typename FCKernel, typename PublicationRecord>
187             bool wait( FCKernel& /*fc*/, PublicationRecord& rec )
188             {
189                 rec.bkoff();
190                 return false;
191             }
192
193             /// Does nothing
194             template <typename FCKernel, typename PublicationRecord>
195             void notify( FCKernel& /*fc*/, PublicationRecord& /*rec*/ )
196             {}
197
198             /// Does nothing
199             template <typename FCKernel>
200             void wakeup( FCKernel& )
201             {}
202         };
203
204         /// Wait strategy based on the single mutex and the condition variable
205         /**
206             The strategy shares the mutex and conditional variable for all thread.
207
208             Template parameter \p Milliseconds specifies waiting duration;
209             the minimal value is 1.
210         */
211         template <int Milliseconds = 2>
212         class single_mutex_single_condvar
213         {
214         //@cond
215             std::mutex  m_mutex;
216             std::condition_variable m_condvar;
217             bool        m_wakeup;
218
219             typedef std::unique_lock< std::mutex > unique_lock;
220         //@endcond
221
222         public:
223             enum {
224                 c_nWaitMilliseconds = Milliseconds < 1 ? 1 : Milliseconds ///< Waiting duration
225             };
226
227             /// Empty metafunction
228             template <typename PublicationRecord>
229             struct make_publication_record {
230                 typedef PublicationRecord type; ///< publication record type
231             };
232
233             /// Default ctor
234             single_mutex_single_condvar()
235                 : m_wakeup( false )
236             {}
237
238             /// Does nothing
239             template <typename PublicationRecord>
240             void prepare( PublicationRecord& /*rec*/ )
241             {}
242
243             /// Sleeps on condition variable waiting for notification from combiner
244             template <typename FCKernel, typename PublicationRecord>
245             bool wait( FCKernel& fc, PublicationRecord& rec )
246             {
247                 if ( fc.get_operation( rec ) >= req_Operation ) {
248                     unique_lock lock( m_mutex );
249                     if ( fc.get_operation( rec ) >= req_Operation ) {
250                         if ( m_wakeup ) {
251                             m_wakeup = false;
252                             return true;
253                         }
254
255                         bool ret = m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout;
256                         m_wakeup = false;
257                         return ret;
258                     }
259                 }
260                 return false;
261             }
262
263             /// Calls condition variable function \p notify_all()
264             template <typename FCKernel, typename PublicationRecord>
265             void notify( FCKernel& fc, PublicationRecord& /*rec*/ )
266             {
267                 wakeup( fc );
268             }
269
270             /// Calls condition variable function \p notify_all()
271             template <typename FCKernel>
272             void wakeup( FCKernel& /*fc*/ )
273             {
274                 unique_lock lock( m_mutex );
275                 m_wakeup = true;
276                 m_condvar.notify_all();
277             }
278         };
279
280         /// Wait strategy based on the single mutex and thread-local condition variables
281         /**
282             The strategy shares the mutex, but each thread has its own conditional variable
283
284             Template parameter \p Milliseconds specifies waiting duration;
285             the minimal value is 1.
286         */
287         template <int Milliseconds = 2>
288         class single_mutex_multi_condvar
289         {
290         //@cond
291             std::mutex  m_mutex;
292             bool        m_wakeup;
293
294             typedef std::unique_lock< std::mutex > unique_lock;
295         //@endcond
296
297         public:
298             enum {
299                 c_nWaitMilliseconds = Milliseconds < 1 ? 1 : Milliseconds  ///< Waiting duration
300             };
301
302             /// Incorporates a condition variable into \p PublicationRecord
303             template <typename PublicationRecord>
304             struct make_publication_record {
305                 /// Metafunction result
306                 struct type: public PublicationRecord
307                 {
308                     //@cond
309                     std::condition_variable m_condvar;
310                     //@endcond
311                 };
312             };
313
314             /// Default ctor
315             single_mutex_multi_condvar()
316                 : m_wakeup( false )
317             {}
318
319             /// Does nothing
320             template <typename PublicationRecord>
321             void prepare( PublicationRecord& /*rec*/ )
322             {}
323
324             /// Sleeps on condition variable waiting for notification from combiner
325             template <typename FCKernel, typename PublicationRecord>
326             bool wait( FCKernel& fc, PublicationRecord& rec )
327             {
328                 if ( fc.get_operation( rec ) >= req_Operation ) {
329                     unique_lock lock( m_mutex );
330
331                     if ( fc.get_operation( rec ) >= req_Operation ) {
332                         if ( m_wakeup ) {
333                             m_wakeup = false;
334                             return true;
335                         }
336
337                         bool ret = rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout;
338                         m_wakeup = false;
339                         return ret;
340                     }
341                 }
342                 return false;
343             }
344
345             /// Calls condition variable function \p notify_one()
346             template <typename FCKernel, typename PublicationRecord>
347             void notify( FCKernel& /*fc*/, PublicationRecord& rec )
348             {
349                 unique_lock lock( m_mutex );
350                 m_wakeup = true;
351                 rec.m_condvar.notify_one();
352             }
353
354             /// Calls \p fc.wakeup_any() to wake up any pending thread
355             template <typename FCKernel>
356             void wakeup( FCKernel& fc )
357             {
358                 fc.wakeup_any();
359             }
360         };
361
362         /// Wait strategy where each thread has a mutex and a condition variable
363         /**
364             Template parameter \p Milliseconds specifies waiting duration;
365             the minimal value is 1.
366         */
367         template <int Milliseconds = 2>
368         class multi_mutex_multi_condvar
369         {
370         //@cond
371             typedef std::unique_lock< std::mutex > unique_lock;
372         //@endcond
373         public:
374             enum {
375                 c_nWaitMilliseconds = Milliseconds < 1 ? 1 : Milliseconds   ///< Waiting duration
376             };
377
378             /// Incorporates a condition variable and a mutex into \p PublicationRecord
379             template <typename PublicationRecord>
380             struct make_publication_record {
381                 /// Metafunction result
382                 struct type: public PublicationRecord
383                 {
384                     //@cond
385                     std::mutex              m_mutex;
386                     std::condition_variable m_condvar;
387                     bool                    m_wakeup;
388
389                     type()
390                         : m_wakeup( false )
391                     {}
392                     //@endcond
393                 };
394             };
395
396             /// Does nothing
397             template <typename PublicationRecord>
398             void prepare( PublicationRecord& /*rec*/ )
399             {}
400
401             /// Sleeps on condition variable waiting for notification from combiner
402             template <typename FCKernel, typename PublicationRecord>
403             bool wait( FCKernel& fc, PublicationRecord& rec )
404             {
405                 if ( fc.get_operation( rec ) >= req_Operation ) {
406                     unique_lock lock( rec.m_mutex );
407
408                     if ( fc.get_operation( rec ) >= req_Operation ) {
409                         if ( rec.m_wakeup ) {
410                             rec.m_wakeup = false;
411                             return true;
412                         }
413
414                         bool ret = rec.m_condvar.wait_for( lock, std::chrono::milliseconds( c_nWaitMilliseconds )) == std::cv_status::no_timeout;
415                         rec.m_wakeup = false;
416                         return ret;
417                     }
418                 }
419                 return false;
420             }
421
422             /// Calls condition variable function \p notify_one()
423             template <typename FCKernel, typename PublicationRecord>
424             void notify( FCKernel& /*fc*/, PublicationRecord& rec )
425             {
426                 unique_lock lock( rec.m_mutex );
427                 rec.m_wakeup = true;
428                 rec.m_condvar.notify_one();
429             }
430
431             /// Calls \p fc.wakeup_any() to wake up any pending thread
432             template <typename FCKernel>
433             void wakeup( FCKernel& fc )
434             {
435                 fc.wakeup_any();
436             }
437         };
438
439     } // namespace wait_strategy
440 }}} // namespace cds::algo::flat_combining
441
442 #endif //CDSLIB_ALGO_FLAT_COMBINING_WAIT_STRATEGY_H