d0a5e3d583214300f21f6800479928a371052315
[junction.git] / samples / MapPerformanceTests / MapPerformanceTests.cpp
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #include <junction/Core.h>
14 #include <turf/CPUTimer.h>
15 #include <turf/Util.h>
16 #include <turf/extra/UniqueSequence.h>
17 #include <turf/extra/JobDispatcher.h>
18 #include <turf/extra/Options.h>
19 #include <junction/extra/MapAdapter.h>
20 #include <algorithm>
21 #include <vector>
22
23 using namespace turf::intTypes;
24 typedef junction::extra::MapAdapter MapAdapter;
25
26 static const ureg NumKeysPerThread = 16384;
27 static const ureg DefaultReadsPerWrite = 19;
28 static const ureg DefaultItersPerChunk = 128;
29 static const ureg DefaultChunks = 10;
30 static const u32 Prime = 0x4190ab09;
31
32 class Delay {
33 private:
34     turf::extra::Random m_rand;
35     u32 m_threshold;
36
37 public:
38     Delay(float ratio) {
39         m_threshold = u32(double(0xffffffffu) * ratio);
40     }
41
42     void delay(ureg& workUnits) {
43         for (;;) {
44             volatile ureg v = m_rand.next32();
45             workUnits++;
46             if (v <= m_threshold)
47                 break;
48         }
49     }
50 };
51
52 struct SharedState {
53     MapAdapter& adapter;
54     MapAdapter::Map* map;
55     ureg numKeysPerThread;
56     float delayFactor;
57     ureg numThreads;
58     ureg readsPerWrite;
59     ureg itersPerChunk;
60     turf::extra::SpinKicker spinKicker;
61     turf::Atomic<u32> doneFlag;
62
63     SharedState(MapAdapter& adapter, ureg numThreads, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
64         : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), numThreads(numThreads), readsPerWrite(readsPerWrite),
65           itersPerChunk(itersPerChunk) {
66         delayFactor = 0.5f;
67         doneFlag.storeNonatomic(0);
68     }
69 };
70
71 class ThreadState {
72 public:
73     SharedState& m_shared;
74     MapAdapter::ThreadContext m_threadCtx;
75     ureg m_threadIndex;
76     u32 m_rangeLo;
77     u32 m_rangeHi;
78
79     u32 m_addIndex;
80     u32 m_removeIndex;
81
82     struct Stats {
83         ureg workUnitsDone;
84         ureg mapOpsDone;
85         double duration;
86
87         Stats() {
88             workUnitsDone = 0;
89             mapOpsDone = 0;
90             duration = 0;
91         }
92
93         Stats& operator+=(const Stats& other) {
94             workUnitsDone += other.workUnitsDone;
95             mapOpsDone += other.mapOpsDone;
96             duration += other.duration;
97             return *this;
98         }
99
100         bool operator<(const Stats& other) const {
101             return duration < other.duration;
102         }
103     };
104
105     Stats m_stats;
106
107     ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi)
108         : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
109         m_threadIndex = threadIndex;
110         m_rangeLo = rangeLo;
111         m_rangeHi = rangeHi;
112         m_addIndex = rangeLo;
113         m_removeIndex = rangeLo;
114     }
115
116     void registerThread() {
117         m_threadCtx.registerThread();
118     }
119
120     void unregisterThread() {
121         m_threadCtx.unregisterThread();
122     }
123
124     void initialPopulate() {
125         TURF_ASSERT(m_addIndex == m_removeIndex);
126         MapAdapter::Map* map = m_shared.map;
127         for (ureg i = 0; i < m_shared.numKeysPerThread; i++) {
128             u32 key = m_addIndex * Prime;
129             map->set(key, (void*) (key & ~uptr(3)));
130             if (++m_addIndex == m_rangeHi)
131                 m_addIndex = m_rangeLo;
132         }
133     }
134
135     void run() {
136         MapAdapter::Map* map = m_shared.map;
137         turf::CPUTimer::Converter converter;
138         Delay delay(m_shared.delayFactor);
139         Stats stats;
140         ureg lookupIndex = m_rangeLo;
141         ureg remaining = m_shared.itersPerChunk;
142         if (m_threadIndex == 0)
143             m_shared.spinKicker.kick(m_shared.numThreads - 1);
144         else {
145             remaining = ~u32(0);
146             m_shared.spinKicker.waitForKick();
147         }
148
149         // ---------
150         turf::CPUTimer::Point start = turf::CPUTimer::get();
151         for (; remaining > 0; remaining--) {
152             // Add
153             delay.delay(stats.workUnitsDone);
154             if (m_shared.doneFlag.load(turf::Relaxed))
155                 break;
156             u32 key = m_addIndex * Prime;
157             if (key >= 2) {
158                 map->set(key, (void*) uptr(key));
159                 stats.mapOpsDone++;
160             }
161             if (++m_addIndex == m_rangeHi)
162                 m_addIndex = m_rangeLo;
163
164             // Lookup
165             if (s32(lookupIndex - m_removeIndex) < 0)
166                 lookupIndex = m_removeIndex;
167             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
168                 delay.delay(stats.workUnitsDone);
169                 if (m_shared.doneFlag.load(turf::Relaxed))
170                     break;
171                 key = lookupIndex * Prime;
172                 if (key >= 2) {
173                     volatile void* value = map->get(key);
174                     TURF_UNUSED(value);
175                     stats.mapOpsDone++;
176                 }
177                 if (++lookupIndex == m_rangeHi)
178                     lookupIndex = m_rangeLo;
179                 if (lookupIndex == m_addIndex)
180                     lookupIndex = m_removeIndex;
181             }
182
183             // Remove
184             delay.delay(stats.workUnitsDone);
185             if (m_shared.doneFlag.load(turf::Relaxed))
186                 break;
187             key = m_removeIndex * Prime;
188             if (key >= 2) {
189                 map->erase(key);
190                 stats.mapOpsDone++;
191             }
192             if (++m_removeIndex == m_rangeHi)
193                 m_removeIndex = m_rangeLo;
194
195             // Lookup
196             if (s32(lookupIndex - m_removeIndex) < 0)
197                 lookupIndex = m_removeIndex;
198             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
199                 delay.delay(stats.workUnitsDone);
200                 if (m_shared.doneFlag.load(turf::Relaxed))
201                     break;
202                 key = lookupIndex * Prime;
203                 if (key >= 2) {
204                     volatile void* value = map->get(key);
205                     TURF_UNUSED(value);
206                     stats.mapOpsDone++;
207                 }
208                 if (++lookupIndex == m_rangeHi)
209                     lookupIndex = m_rangeLo;
210                 if (lookupIndex == m_addIndex)
211                     lookupIndex = m_removeIndex;
212             }
213         }
214         if (m_threadIndex == 0)
215             m_shared.doneFlag.store(1, turf::Relaxed);
216         m_threadCtx.update();
217         turf::CPUTimer::Point end = turf::CPUTimer::get();
218         // ---------
219
220         stats.duration = converter.toSeconds(end - start);
221         m_stats = stats;
222     }
223 };
224
225 static const turf::extra::Option Options[] = {
226     {"readsPerWrite", 'r', true, "number of reads per write"},
227     {"itersPerChunk", 'i', true, "number of iterations per chunk"},
228     {"chunks", 'c', true, "number of chunks to execute"},
229     {"keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep"},
230 };
231
232 int main(int argc, const char** argv) {
233     turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
234     options.parse(argc, argv);
235     ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);
236     ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
237     ureg chunks = options.getInteger("chunks", DefaultChunks);
238     double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
239
240     turf::extra::JobDispatcher dispatcher;
241     ureg numThreads = dispatcher.getNumPhysicalCores();
242     MapAdapter adapter(numThreads);
243
244     // Create shared state and register threads
245     SharedState shared(adapter, numThreads, NumKeysPerThread, readsPerWrite, itersPerChunk);
246     std::vector<ThreadState> threads;
247     threads.reserve(numThreads);
248     for (ureg t = 0; t < numThreads; t++) {
249         u32 rangeLo = 0xffffffffu / numThreads * t + 1;
250         u32 rangeHi = 0xffffffffu / numThreads * (t + 1) + 1;
251         threads.emplace_back(shared, t, rangeLo, rangeHi);
252     }
253     dispatcher.kickMulti(&ThreadState::registerThread, &threads[0], threads.size());
254
255     {
256         // Create the map
257         MapAdapter::Map map(MapAdapter::getInitialCapacity(numThreads * NumKeysPerThread));
258         shared.map = &map;
259         dispatcher.kickMulti(&ThreadState::initialPopulate, &threads[0], threads.size());
260
261         printf("{\n");
262         printf("'mapType': '%s',\n", MapAdapter::MapName);
263         printf("'readsPerWrite': %d,\n", (int) readsPerWrite);
264         printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
265         printf("'chunks': %d,\n", (int) chunks);
266         printf("'keepChunkFraction': %f,\n", keepChunkFraction);
267         printf("'labels': ('delayFactor', 'workUnitsDone', 'mapOpsDone', 'totalTime'),\n"), printf("'points': [\n");
268         for (float delayFactor = 1.f; delayFactor >= 0.0005f; delayFactor *= 0.95f) {
269             shared.delayFactor = delayFactor;
270
271             std::vector<ThreadState::Stats> kickTotals;
272             for (ureg c = 0; c < chunks; c++) {
273                 shared.doneFlag.storeNonatomic(false);
274                 dispatcher.kickMulti(&ThreadState::run, &threads[0], threads.size());
275
276                 ThreadState::Stats kickTotal;
277                 for (ureg t = 0; t < numThreads; t++)
278                     kickTotal += threads[t].m_stats;
279                 kickTotals.push_back(kickTotal);
280             }
281
282             std::sort(kickTotals.begin(), kickTotals.end());
283             ThreadState::Stats totals;
284             for (ureg t = 0; t < ureg(kickTotals.size() * keepChunkFraction); t++) {
285                 totals += kickTotals[t];
286             }
287
288             printf("    (%f, %d, %d, %f),\n", shared.delayFactor, int(totals.workUnitsDone), int(totals.mapOpsDone),
289                    totals.duration);
290         }
291         printf("],\n");
292         printf("}\n");
293
294         shared.map = NULL;
295     }
296
297     dispatcher.kickMulti(&ThreadState::unregisterThread, &threads[0], threads.size());
298     return 0;
299 }