Delete JunctionProjectDefs.cmake; simplify AddSample.cmake and move it to samples
[junction.git] / samples / MapScalabilityTests / MapScalabilityTests.cpp
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #include <junction/Core.h>
14 #include <turf/CPUTimer.h>
15 #include <turf/Util.h>
16 #include <turf/extra/UniqueSequence.h>
17 #include <turf/extra/JobDispatcher.h>
18 #include <turf/extra/Options.h>
19 #include <junction/extra/MapAdapter.h>
20 #include <algorithm>
21 #include <vector>
22
23 using namespace turf::intTypes;
24 typedef junction::extra::MapAdapter MapAdapter;
25
26 static const ureg NumKeysPerThread = 2000;
27 static const ureg DefaultReadsPerWrite = 4;
28 static const ureg DefaultItersPerChunk = 10000;
29 static const ureg DefaultChunks = 200;
30 static const u32 Prime = 0x4190ab09;
31
32 struct SharedState {
33     MapAdapter& adapter;
34     MapAdapter::Map* map;
35     ureg numKeysPerThread;
36     ureg numThreads;
37     ureg readsPerWrite;
38     ureg itersPerChunk;
39     turf::extra::SpinKicker spinKicker;
40     turf::Atomic<u32> doneFlag;
41
42     SharedState(MapAdapter& adapter, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
43         : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), readsPerWrite(readsPerWrite), itersPerChunk(itersPerChunk) {
44         doneFlag.storeNonatomic(0);
45         numThreads = 0;
46     }
47 };
48
49 class ThreadState {
50 public:
51     SharedState& m_shared;
52     MapAdapter::ThreadContext m_threadCtx;
53     ureg m_threadIndex;
54     u32 m_rangeLo;
55     u32 m_rangeHi;
56
57     u32 m_addIndex;
58     u32 m_removeIndex;
59
60     struct Stats {
61         ureg mapOpsDone;
62         double duration;
63
64         Stats() {
65             mapOpsDone = 0;
66             duration = 0;
67         }
68
69         Stats& operator+=(const Stats& other) {
70             mapOpsDone += other.mapOpsDone;
71             duration += other.duration;
72             return *this;
73         }
74
75         bool operator<(const Stats& other) const {
76             return duration < other.duration;
77         }
78     };
79
80     Stats m_stats;
81
82     ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
83         m_threadIndex = threadIndex;
84         m_rangeLo = rangeLo;
85         m_rangeHi = rangeHi;
86         m_addIndex = rangeLo;
87         m_removeIndex = rangeLo;
88     }
89
90     void registerThread() {
91         m_threadCtx.registerThread();
92     }
93
94     void unregisterThread() {
95         m_threadCtx.unregisterThread();
96     }
97
98     void initialPopulate() {
99         TURF_ASSERT(m_addIndex == m_removeIndex);
100         MapAdapter::Map *map = m_shared.map;
101         for (ureg i = 0; i < m_shared.numKeysPerThread; i++) {
102             u32 key = m_addIndex * Prime;
103             if (key >= 2)
104                 map->insert(key, (void*) uptr(key));
105             if (++m_addIndex == m_rangeHi)
106                 m_addIndex = m_rangeLo;
107         }
108     }
109
110     void run() {
111         MapAdapter::Map *map = m_shared.map;
112         turf::CPUTimer::Converter converter;
113         Stats stats;
114         ureg lookupIndex = m_rangeLo;
115         ureg remaining = m_shared.itersPerChunk;
116         if (m_threadIndex == 0)
117             m_shared.spinKicker.kick(m_shared.numThreads - 1);
118         else {
119             remaining = ~u32(0);
120             m_shared.spinKicker.waitForKick();
121         }
122
123         // ---------
124         turf::CPUTimer::Point start = turf::CPUTimer::get();
125         for (; remaining > 0; remaining--) {
126             // Add
127             if (m_shared.doneFlag.load(turf::Relaxed))
128                 break;
129             u32 key = m_addIndex * Prime;
130             if (key >= 2) {
131                 map->insert(key, (void*) uptr(key));
132                 stats.mapOpsDone++;
133             }
134             if (++m_addIndex == m_rangeHi)
135                 m_addIndex = m_rangeLo;
136
137             // Lookup
138             if (s32(lookupIndex - m_removeIndex) < 0)
139                 lookupIndex = m_removeIndex;
140             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
141                 if (m_shared.doneFlag.load(turf::Relaxed))
142                     break;
143                 key = lookupIndex * Prime;
144                 if (key >= 2) {
145                     volatile void* value = map->get(key);
146                     TURF_UNUSED(value);
147                     stats.mapOpsDone++;
148                 }
149                 if (++lookupIndex == m_rangeHi)
150                     lookupIndex = m_rangeLo;
151                 if (lookupIndex == m_addIndex)
152                     lookupIndex = m_removeIndex;
153             }
154
155             // Remove
156             if (m_shared.doneFlag.load(turf::Relaxed))
157                 break;
158             key = m_removeIndex * Prime;
159             if (key >= 2) {
160                 map->erase(key);
161                 stats.mapOpsDone++;
162             }
163             if (++m_removeIndex == m_rangeHi)
164                 m_removeIndex = m_rangeLo;
165
166             // Lookup
167             if (s32(lookupIndex - m_removeIndex) < 0)
168                 lookupIndex = m_removeIndex;
169             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
170                 if (m_shared.doneFlag.load(turf::Relaxed))
171                     break;
172                 key = lookupIndex * Prime;
173                 if (key >= 2) {
174                     volatile void* value = map->get(key);
175                     TURF_UNUSED(value);
176                     stats.mapOpsDone++;
177                 }
178                 if (++lookupIndex == m_rangeHi)
179                     lookupIndex = m_rangeLo;
180                 if (lookupIndex == m_addIndex)
181                     lookupIndex = m_removeIndex;
182             }
183         }
184         if (m_threadIndex == 0)
185             m_shared.doneFlag.store(1, turf::Relaxed);
186         m_threadCtx.update();
187         turf::CPUTimer::Point end = turf::CPUTimer::get();
188         // ---------
189
190         stats.duration = converter.toSeconds(end - start);
191         m_stats = stats;
192     }
193 };
194
195 static const turf::extra::Option Options[] = {
196     { "readsPerWrite", 'r', true, "number of reads per write" },
197     { "itersPerChunk", 'i', true, "number of iterations per chunk" },
198     { "chunks", 'c', true, "number of chunks to execute" },
199     { "keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep" },
200 };
201
202 int main(int argc, const char** argv) {
203     turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
204     options.parse(argc, argv);
205     ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);    
206     ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
207     ureg chunks = options.getInteger("chunks", DefaultChunks);
208     double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
209
210     turf::extra::JobDispatcher dispatcher;
211     ureg numCores = dispatcher.getNumPhysicalCores();
212     TURF_ASSERT(numCores > 0);
213     MapAdapter adapter(numCores);
214
215     // Create shared state and register first thread
216     SharedState shared(adapter, NumKeysPerThread, readsPerWrite, itersPerChunk);
217     std::vector<ThreadState> threads;
218     threads.reserve(numCores);
219     for (ureg t = 0; t < numCores; t++) {
220         u32 rangeLo = 0xffffffffu / numCores * t + 1;
221         u32 rangeHi = 0xffffffffu / numCores * (t + 1) + 1;
222         threads.emplace_back(shared, t, rangeLo, rangeHi);
223     }
224     dispatcher.kickOne(0, &ThreadState::registerThread, threads[0]);
225
226     {
227         // Create the map and populate it entirely from main thread
228         MapAdapter::Map map(MapAdapter::getInitialCapacity(numCores * NumKeysPerThread));
229         shared.map = &map;
230         for (ureg t = 0; t < numCores; t++) {
231             threads[t].initialPopulate();
232         }
233
234         printf("{\n");
235         printf("'mapType': '%s',\n", MapAdapter::MapName);
236         printf("'population': %d,\n", (int) (numCores * NumKeysPerThread));
237         printf("'readsPerWrite': %d,\n", (int) readsPerWrite);
238         printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
239         printf("'chunks': %d,\n", (int) chunks);
240         printf("'keepChunkFraction': %f,\n", keepChunkFraction);
241         printf("'labels': ('numThreads', 'mapOpsDone', 'totalTime'),\n"),
242         printf("'points': [\n");
243         for (shared.numThreads = 1; shared.numThreads <= numCores; shared.numThreads++) {
244             if (shared.numThreads > 1) {
245                 // Spawn and register a new thread
246                 dispatcher.kickOne(shared.numThreads - 1, &ThreadState::registerThread, threads[shared.numThreads - 1]);
247             }
248
249             std::vector<ThreadState::Stats> kickTotals;
250             for (ureg c = 0; c < chunks; c++) {
251                 shared.doneFlag.storeNonatomic(false);
252                 dispatcher.kickMulti(&ThreadState::run, &threads[0], shared.numThreads);
253
254                 ThreadState::Stats kickTotal;
255                 for (ureg t = 0; t < shared.numThreads; t++)
256                     kickTotal += threads[t].m_stats;
257                 kickTotals.push_back(kickTotal);
258             }
259
260             std::sort(kickTotals.begin(), kickTotals.end());
261             ThreadState::Stats totals;
262             for (ureg t = 0; t < ureg(kickTotals.size() * keepChunkFraction); t++) {
263                 totals += kickTotals[t];
264             }
265
266             printf("    (%d, %d, %f),\n",
267                 int(shared.numThreads),
268                 int(totals.mapOpsDone),
269                 totals.duration);
270         }
271         printf("],\n");
272         printf("}\n");
273
274         shared.map = NULL;
275     }
276
277     dispatcher.kickMulti(&ThreadState::unregisterThread, &threads[0], threads.size());
278     return 0;
279 }