a9fa58b48052b2c621363bcc43119584ab68f955
[junction.git] / samples / MapScalabilityTests / MapScalabilityTests.cpp
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #include <junction/Core.h>
14 #include <turf/CPUTimer.h>
15 #include <turf/Util.h>
16 #include <turf/extra/UniqueSequence.h>
17 #include <turf/extra/JobDispatcher.h>
18 #include <turf/extra/Options.h>
19 #include <junction/extra/MapAdapter.h>
20 #include <algorithm>
21 #include <vector>
22
23 using namespace turf::intTypes;
24 typedef junction::extra::MapAdapter MapAdapter;
25
26 static const ureg NumKeysPerThread = 2000;
27 static const ureg DefaultReadsPerWrite = 4;
28 static const ureg DefaultItersPerChunk = 10000;
29 static const ureg DefaultChunks = 200;
30 static const u32 Prime = 0x4190ab09;
31
32 struct SharedState {
33     MapAdapter& adapter;
34     MapAdapter::Map* map;
35     ureg numKeysPerThread;
36     ureg numThreads;
37     ureg readsPerWrite;
38     ureg itersPerChunk;
39     turf::extra::SpinKicker spinKicker;
40     turf::Atomic<u32> doneFlag;
41
42     SharedState(MapAdapter& adapter, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
43         : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), readsPerWrite(readsPerWrite),
44           itersPerChunk(itersPerChunk) {
45         doneFlag.storeNonatomic(0);
46         numThreads = 0;
47     }
48 };
49
50 class ThreadState {
51 public:
52     SharedState& m_shared;
53     MapAdapter::ThreadContext m_threadCtx;
54     ureg m_threadIndex;
55     u32 m_rangeLo;
56     u32 m_rangeHi;
57
58     u32 m_addIndex;
59     u32 m_removeIndex;
60
61     struct Stats {
62         ureg mapOpsDone;
63         double duration;
64
65         Stats() {
66             mapOpsDone = 0;
67             duration = 0;
68         }
69
70         Stats& operator+=(const Stats& other) {
71             mapOpsDone += other.mapOpsDone;
72             duration += other.duration;
73             return *this;
74         }
75
76         bool operator<(const Stats& other) const {
77             return duration < other.duration;
78         }
79     };
80
81     Stats m_stats;
82
83     ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi)
84         : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
85         m_threadIndex = threadIndex;
86         m_rangeLo = rangeLo;
87         m_rangeHi = rangeHi;
88         m_addIndex = rangeLo;
89         m_removeIndex = rangeLo;
90     }
91
92     void registerThread() {
93         m_threadCtx.registerThread();
94     }
95
96     void unregisterThread() {
97         m_threadCtx.unregisterThread();
98     }
99
100     void initialPopulate() {
101         TURF_ASSERT(m_addIndex == m_removeIndex);
102         MapAdapter::Map* map = m_shared.map;
103         for (ureg i = 0; i < m_shared.numKeysPerThread; i++) {
104             u32 key = m_addIndex * Prime;
105             if (key >= 2)
106                 map->assign(key, (void*) uptr(key));
107             if (++m_addIndex == m_rangeHi)
108                 m_addIndex = m_rangeLo;
109         }
110     }
111
112     void run() {
113         MapAdapter::Map* map = m_shared.map;
114         turf::CPUTimer::Converter converter;
115         Stats stats;
116         ureg lookupIndex = m_rangeLo;
117         ureg remaining = m_shared.itersPerChunk;
118         if (m_threadIndex == 0)
119             m_shared.spinKicker.kick(m_shared.numThreads - 1);
120         else {
121             remaining = ~u32(0);
122             m_shared.spinKicker.waitForKick();
123         }
124
125         // ---------
126         turf::CPUTimer::Point start = turf::CPUTimer::get();
127         for (; remaining > 0; remaining--) {
128             // Add
129             if (m_shared.doneFlag.load(turf::Relaxed))
130                 break;
131             u32 key = m_addIndex * Prime;
132             if (key >= 2) {
133                 map->assign(key, (void*) uptr(key));
134                 stats.mapOpsDone++;
135             }
136             if (++m_addIndex == m_rangeHi)
137                 m_addIndex = m_rangeLo;
138
139             // Lookup
140             if (s32(lookupIndex - m_removeIndex) < 0)
141                 lookupIndex = m_removeIndex;
142             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
143                 if (m_shared.doneFlag.load(turf::Relaxed))
144                     break;
145                 key = lookupIndex * Prime;
146                 if (key >= 2) {
147                     volatile void* value = map->get(key);
148                     TURF_UNUSED(value);
149                     stats.mapOpsDone++;
150                 }
151                 if (++lookupIndex == m_rangeHi)
152                     lookupIndex = m_rangeLo;
153                 if (lookupIndex == m_addIndex)
154                     lookupIndex = m_removeIndex;
155             }
156
157             // Remove
158             if (m_shared.doneFlag.load(turf::Relaxed))
159                 break;
160             key = m_removeIndex * Prime;
161             if (key >= 2) {
162                 map->erase(key);
163                 stats.mapOpsDone++;
164             }
165             if (++m_removeIndex == m_rangeHi)
166                 m_removeIndex = m_rangeLo;
167
168             // Lookup
169             if (s32(lookupIndex - m_removeIndex) < 0)
170                 lookupIndex = m_removeIndex;
171             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
172                 if (m_shared.doneFlag.load(turf::Relaxed))
173                     break;
174                 key = lookupIndex * Prime;
175                 if (key >= 2) {
176                     volatile void* value = map->get(key);
177                     TURF_UNUSED(value);
178                     stats.mapOpsDone++;
179                 }
180                 if (++lookupIndex == m_rangeHi)
181                     lookupIndex = m_rangeLo;
182                 if (lookupIndex == m_addIndex)
183                     lookupIndex = m_removeIndex;
184             }
185         }
186         if (m_threadIndex == 0)
187             m_shared.doneFlag.store(1, turf::Relaxed);
188         m_threadCtx.update();
189         turf::CPUTimer::Point end = turf::CPUTimer::get();
190         // ---------
191
192         stats.duration = converter.toSeconds(end - start);
193         m_stats = stats;
194     }
195 };
196
197 static const turf::extra::Option Options[] = {
198     {"readsPerWrite", 'r', true, "number of reads per write"},
199     {"itersPerChunk", 'i', true, "number of iterations per chunk"},
200     {"chunks", 'c', true, "number of chunks to execute"},
201     {"keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep"},
202 };
203
204 int main(int argc, const char** argv) {
205     turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
206     options.parse(argc, argv);
207     ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);
208     ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
209     ureg chunks = options.getInteger("chunks", DefaultChunks);
210     double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
211
212     turf::extra::JobDispatcher dispatcher;
213     ureg numCores = dispatcher.getNumPhysicalCores();
214     TURF_ASSERT(numCores > 0);
215     MapAdapter adapter(numCores);
216
217     // Create shared state and register first thread
218     SharedState shared(adapter, NumKeysPerThread, readsPerWrite, itersPerChunk);
219     std::vector<ThreadState> threads;
220     threads.reserve(numCores);
221     for (ureg t = 0; t < numCores; t++) {
222         u32 rangeLo = 0xffffffffu / numCores * t + 1;
223         u32 rangeHi = 0xffffffffu / numCores * (t + 1) + 1;
224         threads.emplace_back(shared, t, rangeLo, rangeHi);
225     }
226     dispatcher.kickOne(0, &ThreadState::registerThread, threads[0]);
227
228     {
229         // Create the map and populate it entirely from main thread
230         MapAdapter::Map map(MapAdapter::getInitialCapacity(numCores * NumKeysPerThread));
231         shared.map = &map;
232         for (ureg t = 0; t < numCores; t++) {
233             threads[t].initialPopulate();
234         }
235
236         printf("{\n");
237         printf("'mapType': '%s',\n", MapAdapter::MapName);
238         printf("'population': %d,\n", (int) (numCores * NumKeysPerThread));
239         printf("'readsPerWrite': %d,\n", (int) readsPerWrite);
240         printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
241         printf("'chunks': %d,\n", (int) chunks);
242         printf("'keepChunkFraction': %f,\n", keepChunkFraction);
243         printf("'labels': ('numThreads', 'mapOpsDone', 'totalTime'),\n"), printf("'points': [\n");
244         for (shared.numThreads = 1; shared.numThreads <= numCores; shared.numThreads++) {
245             if (shared.numThreads > 1) {
246                 // Spawn and register a new thread
247                 dispatcher.kickOne(shared.numThreads - 1, &ThreadState::registerThread, threads[shared.numThreads - 1]);
248             }
249
250             std::vector<ThreadState::Stats> kickTotals;
251             for (ureg c = 0; c < chunks; c++) {
252                 shared.doneFlag.storeNonatomic(false);
253                 dispatcher.kickMulti(&ThreadState::run, &threads[0], shared.numThreads);
254
255                 ThreadState::Stats kickTotal;
256                 for (ureg t = 0; t < shared.numThreads; t++)
257                     kickTotal += threads[t].m_stats;
258                 kickTotals.push_back(kickTotal);
259             }
260
261             std::sort(kickTotals.begin(), kickTotals.end());
262             ThreadState::Stats totals;
263             for (ureg t = 0; t < ureg(kickTotals.size() * keepChunkFraction); t++) {
264                 totals += kickTotals[t];
265             }
266
267             printf("    (%d, %d, %f),\n", int(shared.numThreads), int(totals.mapOpsDone), totals.duration);
268         }
269         printf("],\n");
270         printf("}\n");
271
272         shared.map = NULL;
273     }
274
275     dispatcher.kickMulti(&ThreadState::unregisterThread, &threads[0], threads.size());
276     return 0;
277 }