Merge pull request #1 from richardkogelnig/patch-1
[junction.git] / samples / MapScalabilityTests / MapScalabilityTests.cpp
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #include <junction/Core.h>
14 #include <turf/CPUTimer.h>
15 #include <turf/Util.h>
16 #include <turf/extra/UniqueSequence.h>
17 #include <turf/extra/JobDispatcher.h>
18 #include <junction/Averager.h>
19 #include <turf/extra/Options.h>
20 #include <junction/extra/MapAdapter.h>
21
22 using namespace turf::intTypes;
23 typedef junction::extra::MapAdapter MapAdapter;
24
25 static const ureg NumKeysPerThread = 2000;
26 static const ureg DefaultReadsPerWrite = 4;
27 static const ureg DefaultItersPerChunk = 10000;
28 static const ureg DefaultChunks = 200;
29 static const u32 Prime = 0x4190ab09;
30
31 struct SharedState {
32     MapAdapter& adapter;
33     MapAdapter::Map* map;
34     ureg numKeysPerThread;
35     ureg numThreads;
36     ureg readsPerWrite;
37     ureg itersPerChunk;
38     turf::extra::SpinKicker spinKicker;
39     turf::Atomic<u32> doneFlag;
40
41     SharedState(MapAdapter& adapter, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
42         : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), readsPerWrite(readsPerWrite), itersPerChunk(itersPerChunk) {
43         doneFlag.storeNonatomic(0);
44         numThreads = 0;
45     }
46 };
47
48 class ThreadState {
49 public:
50     SharedState& m_shared;
51     MapAdapter::ThreadContext m_threadCtx;
52     ureg m_threadIndex;
53     u32 m_rangeLo;
54     u32 m_rangeHi;
55
56     u32 m_addIndex;
57     u32 m_removeIndex;
58
59     struct Stats {
60         ureg mapOpsDone;
61         double duration;
62
63         Stats() {
64             mapOpsDone = 0;
65             duration = 0;
66         }
67
68         Stats& operator+=(const Stats& other) {
69             mapOpsDone += other.mapOpsDone;
70             duration += other.duration;
71             return *this;
72         }
73
74         bool operator<(const Stats& other) const {
75             return duration < other.duration;
76         }
77     };
78
79     Stats m_stats;
80
81     ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
82         m_threadIndex = threadIndex;
83         m_rangeLo = rangeLo;
84         m_rangeHi = rangeHi;
85         m_addIndex = rangeLo;
86         m_removeIndex = rangeLo;
87     }
88
89     void registerThread() {
90         m_threadCtx.registerThread();
91     }
92
93     void unregisterThread() {
94         m_threadCtx.unregisterThread();
95     }
96
97     void initialPopulate() {
98         TURF_ASSERT(m_addIndex == m_removeIndex);
99         MapAdapter::Map *map = m_shared.map;
100         for (ureg i = 0; i < m_shared.numKeysPerThread; i++) {
101             u32 key = m_addIndex * Prime;
102             if (key >= 2)
103                 map->insert(key, (void*) uptr(key));
104             if (++m_addIndex == m_rangeHi)
105                 m_addIndex = m_rangeLo;
106         }
107     }
108
109     void run() {
110         MapAdapter::Map *map = m_shared.map;
111         turf::CPUTimer::Converter converter;
112         Stats stats;
113         ureg lookupIndex = m_rangeLo;
114         ureg remaining = m_shared.itersPerChunk;
115         if (m_threadIndex == 0)
116             m_shared.spinKicker.kick(m_shared.numThreads - 1);
117         else {
118             remaining = ~u32(0);
119             m_shared.spinKicker.waitForKick();
120         }
121
122         // ---------
123         turf::CPUTimer::Point start = turf::CPUTimer::get();
124         for (; remaining > 0; remaining--) {
125             // Add
126             if (m_shared.doneFlag.load(turf::Relaxed))
127                 break;
128             u32 key = m_addIndex * Prime;
129             if (key >= 2) {
130                 map->insert(key, (void*) uptr(key));
131                 stats.mapOpsDone++;
132             }
133             if (++m_addIndex == m_rangeHi)
134                 m_addIndex = m_rangeLo;
135
136             // Lookup
137             if (s32(lookupIndex - m_removeIndex) < 0)
138                 lookupIndex = m_removeIndex;
139             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
140                 if (m_shared.doneFlag.load(turf::Relaxed))
141                     break;
142                 key = lookupIndex * Prime;
143                 if (key >= 2) {
144                     volatile void* value = map->get(key);
145                     TURF_UNUSED(value);
146                     stats.mapOpsDone++;
147                 }
148                 if (++lookupIndex == m_rangeHi)
149                     lookupIndex = m_rangeLo;
150                 if (lookupIndex == m_addIndex)
151                     lookupIndex = m_removeIndex;
152             }
153
154             // Remove
155             if (m_shared.doneFlag.load(turf::Relaxed))
156                 break;
157             key = m_removeIndex * Prime;
158             if (key >= 2) {
159                 map->erase(key);
160                 stats.mapOpsDone++;
161             }
162             if (++m_removeIndex == m_rangeHi)
163                 m_removeIndex = m_rangeLo;
164
165             // Lookup
166             if (s32(lookupIndex - m_removeIndex) < 0)
167                 lookupIndex = m_removeIndex;
168             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
169                 if (m_shared.doneFlag.load(turf::Relaxed))
170                     break;
171                 key = lookupIndex * Prime;
172                 if (key >= 2) {
173                     volatile void* value = map->get(key);
174                     TURF_UNUSED(value);
175                     stats.mapOpsDone++;
176                 }
177                 if (++lookupIndex == m_rangeHi)
178                     lookupIndex = m_rangeLo;
179                 if (lookupIndex == m_addIndex)
180                     lookupIndex = m_removeIndex;
181             }
182         }
183         if (m_threadIndex == 0)
184             m_shared.doneFlag.store(1, turf::Relaxed);
185         m_threadCtx.update();
186         turf::CPUTimer::Point end = turf::CPUTimer::get();
187         // ---------
188
189         stats.duration = converter.toSeconds(end - start);
190         m_stats = stats;
191     }
192 };
193
194 static const turf::extra::Option Options[] = {
195     { "readsPerWrite", 'r', true, "number of reads per write" },
196     { "itersPerChunk", 'i', true, "number of iterations per chunk" },
197     { "chunks", 'c', true, "number of chunks to execute" },
198     { "keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep" },
199 };
200
201 int main(int argc, const char** argv) {
202     turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
203     options.parse(argc, argv);
204     ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);    
205     ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
206     ureg chunks = options.getInteger("chunks", DefaultChunks);
207     double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
208
209     turf::extra::JobDispatcher dispatcher;
210     ureg numCores = dispatcher.getNumPhysicalCores();
211     TURF_ASSERT(numCores > 0);
212     MapAdapter adapter(numCores);
213
214     // Create shared state and register first thread
215     SharedState shared(adapter, NumKeysPerThread, readsPerWrite, itersPerChunk);
216     std::vector<ThreadState> threads;
217     threads.reserve(numCores);
218     for (ureg t = 0; t < numCores; t++) {
219         u32 rangeLo = 0xffffffffu / numCores * t + 1;
220         u32 rangeHi = 0xffffffffu / numCores * (t + 1) + 1;
221         threads.emplace_back(shared, t, rangeLo, rangeHi);
222     }
223     dispatcher.kickOne(0, &ThreadState::registerThread, threads[0]);
224
225     {
226         // Create the map and populate it entirely from main thread
227         MapAdapter::Map map(MapAdapter::getInitialCapacity(numCores * NumKeysPerThread));
228         shared.map = &map;
229         for (ureg t = 0; t < numCores; t++) {
230             threads[t].initialPopulate();
231         }
232
233         printf("{\n");
234         printf("'mapType': '%s',\n", MapAdapter::MapName);
235         printf("'population': %d,\n", (int) (numCores * NumKeysPerThread));
236         printf("'readsPerWrite': %d,\n", (int) readsPerWrite);
237         printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
238         printf("'chunks': %d,\n", (int) chunks);
239         printf("'keepChunkFraction': %f,\n", keepChunkFraction);
240         printf("'labels': ('numThreads', 'mapOpsDone', 'totalTime'),\n"),
241         printf("'points': [\n");
242         for (shared.numThreads = 1; shared.numThreads <= numCores; shared.numThreads++) {
243             if (shared.numThreads > 1) {
244                 // Spawn and register a new thread
245                 dispatcher.kickOne(shared.numThreads - 1, &ThreadState::registerThread, threads[shared.numThreads - 1]);
246             }
247
248             std::vector<ThreadState::Stats> kickTotals;
249             for (ureg c = 0; c < chunks; c++) {
250                 shared.doneFlag.storeNonatomic(false);
251                 dispatcher.kickMulti(&ThreadState::run, &threads[0], shared.numThreads);
252
253                 ThreadState::Stats kickTotal;
254                 for (ureg t = 0; t < shared.numThreads; t++)
255                     kickTotal += threads[t].m_stats;
256                 kickTotals.push_back(kickTotal);
257             }
258
259             std::sort(kickTotals.begin(), kickTotals.end());
260             ThreadState::Stats totals;
261             for (ureg t = 0; t < ureg(kickTotals.size() * keepChunkFraction); t++) {
262                 totals += kickTotals[t];
263             }
264
265             printf("    (%d, %d, %f),\n",
266                 int(shared.numThreads),
267                 int(totals.mapOpsDone),
268                 totals.duration);
269         }
270         printf("],\n");
271         printf("}\n");
272
273         shared.map = NULL;
274     }
275
276     dispatcher.kickMulti(&ThreadState::unregisterThread, &threads[0], threads.size());
277     return 0;
278 }