Merge pull request #23 from mdw55189/issue-19
[junction.git] / samples / MapScalabilityTests / MapScalabilityTests.cpp
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #include <junction/Core.h>
14 #include <turf/CPUTimer.h>
15 #include <turf/Util.h>
16 #include <turf/extra/UniqueSequence.h>
17 #include <turf/extra/JobDispatcher.h>
18 #include <turf/extra/Options.h>
19 #include <junction/extra/MapAdapter.h>
20 #include <algorithm>
21 #include <vector>
22 #include <stdio.h>
23
24 using namespace turf::intTypes;
25 typedef junction::extra::MapAdapter MapAdapter;
26
27 static const ureg NumKeysPerThread = 2000;
28 static const ureg DefaultReadsPerWrite = 4;
29 static const ureg DefaultItersPerChunk = 10000;
30 static const ureg DefaultChunks = 200;
31 static const u32 Prime = 0x4190ab09;
32
33 struct SharedState {
34     MapAdapter& adapter;
35     MapAdapter::Map* map;
36     ureg numKeysPerThread;
37     ureg numThreads;
38     ureg readsPerWrite;
39     ureg itersPerChunk;
40     turf::extra::SpinKicker spinKicker;
41     turf::Atomic<u32> doneFlag;
42
43     SharedState(MapAdapter& adapter, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
44         : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), readsPerWrite(readsPerWrite),
45           itersPerChunk(itersPerChunk) {
46         doneFlag.storeNonatomic(0);
47         numThreads = 0;
48     }
49 };
50
51 class ThreadState {
52 public:
53     SharedState* m_shared;
54     MapAdapter::ThreadContext m_threadCtx;
55     ureg m_threadIndex;
56     u32 m_rangeLo;
57     u32 m_rangeHi;
58
59     u32 m_addIndex;
60     u32 m_removeIndex;
61
62     struct Stats {
63         ureg mapOpsDone;
64         double duration;
65
66         Stats() {
67             mapOpsDone = 0;
68             duration = 0;
69         }
70
71         Stats& operator+=(const Stats& other) {
72             mapOpsDone += other.mapOpsDone;
73             duration += other.duration;
74             return *this;
75         }
76
77         bool operator<(const Stats& other) const {
78             return duration < other.duration;
79         }
80     };
81
82     Stats m_stats;
83
84     ThreadState(SharedState* shared, ureg threadIndex, u32 rangeLo, u32 rangeHi)
85         : m_shared(shared), m_threadCtx(shared->adapter, threadIndex) {
86         m_threadIndex = threadIndex;
87         m_rangeLo = rangeLo;
88         m_rangeHi = rangeHi;
89         m_addIndex = rangeLo;
90         m_removeIndex = rangeLo;
91     }
92
93     void registerThread() {
94         m_threadCtx.registerThread();
95     }
96
97     void unregisterThread() {
98         m_threadCtx.unregisterThread();
99     }
100
101     void initialPopulate() {
102         TURF_ASSERT(m_addIndex == m_removeIndex);
103         MapAdapter::Map* map = m_shared->map;
104         for (ureg i = 0; i < m_shared->numKeysPerThread; i++) {
105             u32 key = m_addIndex * Prime;
106             if (key >= 2)
107                 map->assign(key, (void*) uptr(key));
108             if (++m_addIndex == m_rangeHi)
109                 m_addIndex = m_rangeLo;
110         }
111     }
112
113     void run() {
114         MapAdapter::Map* map = m_shared->map;
115         turf::CPUTimer::Converter converter;
116         Stats stats;
117         ureg lookupIndex = m_rangeLo;
118         ureg remaining = m_shared->itersPerChunk;
119         if (m_threadIndex == 0)
120             m_shared->spinKicker.kick(m_shared->numThreads - 1);
121         else {
122             remaining = ~u32(0);
123             m_shared->spinKicker.waitForKick();
124         }
125
126         // ---------
127         turf::CPUTimer::Point start = turf::CPUTimer::get();
128         for (; remaining > 0; remaining--) {
129             // Add
130             if (m_shared->doneFlag.load(turf::Relaxed))
131                 break;
132             u32 key = m_addIndex * Prime;
133             if (key >= 2) {
134                 map->assign(key, (void*) uptr(key));
135                 stats.mapOpsDone++;
136             }
137             if (++m_addIndex == m_rangeHi)
138                 m_addIndex = m_rangeLo;
139
140             // Lookup
141             if (s32(lookupIndex - m_removeIndex) < 0)
142                 lookupIndex = m_removeIndex;
143             for (ureg l = 0; l < m_shared->readsPerWrite; l++) {
144                 if (m_shared->doneFlag.load(turf::Relaxed))
145                     break;
146                 key = lookupIndex * Prime;
147                 if (key >= 2) {
148                     volatile void* value = map->get(key);
149                     TURF_UNUSED(value);
150                     stats.mapOpsDone++;
151                 }
152                 if (++lookupIndex == m_rangeHi)
153                     lookupIndex = m_rangeLo;
154                 if (lookupIndex == m_addIndex)
155                     lookupIndex = m_removeIndex;
156             }
157
158             // Remove
159             if (m_shared->doneFlag.load(turf::Relaxed))
160                 break;
161             key = m_removeIndex * Prime;
162             if (key >= 2) {
163                 map->erase(key);
164                 stats.mapOpsDone++;
165             }
166             if (++m_removeIndex == m_rangeHi)
167                 m_removeIndex = m_rangeLo;
168
169             // Lookup
170             if (s32(lookupIndex - m_removeIndex) < 0)
171                 lookupIndex = m_removeIndex;
172             for (ureg l = 0; l < m_shared->readsPerWrite; l++) {
173                 if (m_shared->doneFlag.load(turf::Relaxed))
174                     break;
175                 key = lookupIndex * Prime;
176                 if (key >= 2) {
177                     volatile void* value = map->get(key);
178                     TURF_UNUSED(value);
179                     stats.mapOpsDone++;
180                 }
181                 if (++lookupIndex == m_rangeHi)
182                     lookupIndex = m_rangeLo;
183                 if (lookupIndex == m_addIndex)
184                     lookupIndex = m_removeIndex;
185             }
186         }
187         if (m_threadIndex == 0)
188             m_shared->doneFlag.store(1, turf::Relaxed);
189         m_threadCtx.update();
190         turf::CPUTimer::Point end = turf::CPUTimer::get();
191         // ---------
192
193         stats.duration = converter.toSeconds(end - start);
194         m_stats = stats;
195     }
196 };
197
198 static const turf::extra::Option Options[] = {
199     {"readsPerWrite", 'r', true, "number of reads per write"},
200     {"itersPerChunk", 'i', true, "number of iterations per chunk"},
201     {"chunks", 'c', true, "number of chunks to execute"},
202     {"keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep"},
203 };
204
205 int main(int argc, const char** argv) {
206     turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
207     options.parse(argc, argv);
208     ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);
209     ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
210     ureg chunks = options.getInteger("chunks", DefaultChunks);
211     double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
212
213     turf::extra::JobDispatcher dispatcher;
214     ureg numCores = dispatcher.getNumPhysicalCores();
215     TURF_ASSERT(numCores > 0);
216     MapAdapter adapter(numCores);
217
218     // Create shared state and register first thread
219     SharedState shared(adapter, NumKeysPerThread, readsPerWrite, itersPerChunk);
220     std::vector<ThreadState> threads;
221     threads.reserve(numCores);
222     for (ureg t = 0; t < numCores; t++) {
223         u32 rangeLo = 0xffffffffu / numCores * t + 1;
224         u32 rangeHi = 0xffffffffu / numCores * (t + 1) + 1;
225         threads.push_back(ThreadState(&shared, t, rangeLo, rangeHi));
226     }
227     dispatcher.kickOne(0, &ThreadState::registerThread, threads[0]);
228
229     {
230         // Create the map and populate it entirely from main thread
231         MapAdapter::Map map(MapAdapter::getInitialCapacity(numCores * NumKeysPerThread));
232         shared.map = &map;
233         for (ureg t = 0; t < numCores; t++) {
234             threads[t].initialPopulate();
235         }
236
237         printf("{\n");
238         printf("'mapType': '%s',\n", MapAdapter::getMapName());
239         printf("'population': %d,\n", (int) (numCores * NumKeysPerThread));
240         printf("'readsPerWrite': %d,\n", (int) readsPerWrite);
241         printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
242         printf("'chunks': %d,\n", (int) chunks);
243         printf("'keepChunkFraction': %f,\n", keepChunkFraction);
244         printf("'labels': ('numThreads', 'mapOpsDone', 'totalTime'),\n"), printf("'points': [\n");
245         for (shared.numThreads = 1; shared.numThreads <= numCores; shared.numThreads++) {
246             if (shared.numThreads > 1) {
247                 // Spawn and register a new thread
248                 dispatcher.kickOne(shared.numThreads - 1, &ThreadState::registerThread, threads[shared.numThreads - 1]);
249             }
250
251             std::vector<ThreadState::Stats> kickTotals;
252             for (ureg c = 0; c < chunks; c++) {
253                 shared.doneFlag.storeNonatomic(false);
254                 dispatcher.kickMulti(&ThreadState::run, &threads[0], shared.numThreads);
255
256                 ThreadState::Stats kickTotal;
257                 for (ureg t = 0; t < shared.numThreads; t++)
258                     kickTotal += threads[t].m_stats;
259                 kickTotals.push_back(kickTotal);
260             }
261
262             std::sort(kickTotals.begin(), kickTotals.end());
263             ThreadState::Stats totals;
264             for (ureg t = 0; t < ureg(kickTotals.size() * keepChunkFraction); t++) {
265                 totals += kickTotals[t];
266             }
267
268             printf("    (%d, %d, %f),\n", int(shared.numThreads), int(totals.mapOpsDone), totals.duration);
269         }
270         printf("],\n");
271         printf("}\n");
272
273         shared.map = NULL;
274     }
275
276     dispatcher.kickMulti(&ThreadState::unregisterThread, &threads[0], threads.size());
277     return 0;
278 }