Initial commit
[junction.git] / junction / SimpleJobCoordinator.h
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #ifndef JUNCTION_SIMPLEJOBCOORDINATOR_H
14 #define JUNCTION_SIMPLEJOBCOORDINATOR_H
15
16 #include <junction/Core.h>
17 #include <junction/striped/ConditionBank.h>
18
19 namespace junction {
20
21 // It's safe to call everything here from within a Job itself.
22 // In particular, you're allowed to particpate() recursively.
23 // We actually do this in ConcurrentMap_Grampa::publish() when migrating a new flattree.
24 class SimpleJobCoordinator {
25 public:
26     struct Job {
27         virtual ~Job() {
28         }
29         virtual void run() = 0;
30     };
31
32 private:
33     JUNCTION_STRIPED_CONDITIONBANK_DEFINE_MEMBER()
34     turf::Atomic<uptr> m_job;
35
36 public:
37     SimpleJobCoordinator() : m_job(uptr(NULL)) {
38     }
39
40     Job* loadConsume() const {
41         return (Job*) m_job.load(turf::Consume);
42     }
43
44     void storeRelease(Job* job) {
45         junction::striped::ConditionPair& pair = JUNCTION_STRIPED_CONDITIONBANK_GET(this);
46         {
47             turf::LockGuard<turf::Mutex> guard(pair.mutex);
48             m_job.store(uptr(job), turf::Release);
49         }
50         pair.condVar.wakeAll();
51     }
52
53     void participate() {
54         junction::striped::ConditionPair& pair = JUNCTION_STRIPED_CONDITIONBANK_GET(this);
55         uptr prevJob = uptr(NULL);
56         for (;;) {
57             uptr job = m_job.load(turf::Consume);
58             if (job == prevJob) {
59                 turf::LockGuard<turf::Mutex> guard(pair.mutex);
60                 for (;;) {
61                     job = m_job.loadNonatomic();    // No concurrent writes inside lock
62                     if (job != prevJob)
63                         break;
64                     pair.condVar.wait(guard);
65                 }
66             }
67             if (job == 1)
68                 return;
69             reinterpret_cast<Job*>(job)->run();
70             prevJob = job;
71         }
72     }
73
74     void runOne(Job* job) {
75         TURF_ASSERT(job != (Job*) m_job.load(turf::Relaxed));
76         storeRelease(job);
77         job->run();
78     }
79
80     void end() {
81         junction::striped::ConditionPair& pair = JUNCTION_STRIPED_CONDITIONBANK_GET(this);
82         {
83             turf::LockGuard<turf::Mutex> guard(pair.mutex);
84             m_job.store(1, turf::Release);
85         }
86         pair.condVar.wakeAll();
87     }
88 };
89
90 } // namespace junction
91
92 #endif // JUNCTION_SIMPLEJOBCOORDINATOR_H