4c9b91e9e6c6b477303944cbec15f523572f276b
[junction.git] / samples / MapPerformanceTests / MapPerformanceTests.cpp
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #include <junction/Core.h>
14 #include <turf/CPUTimer.h>
15 #include <turf/Util.h>
16 #include <turf/extra/UniqueSequence.h>
17 #include <turf/extra/JobDispatcher.h>
18 #include <junction/Averager.h>
19 #include <turf/extra/Options.h>
20 #include <junction/extra/MapAdapter.h>
21
22 using namespace turf::intTypes;
23 typedef junction::extra::MapAdapter MapAdapter;
24
25 static const ureg NumKeysPerThread = 16384;
26 static const ureg DefaultReadsPerWrite = 19;
27 static const ureg DefaultItersPerChunk = 128;
28 static const ureg DefaultChunks = 10;
29 static const u32 Prime = 0x4190ab09;
30
31 class Delay {
32 private:
33     turf::extra::Random m_rand;
34     u32 m_threshold;
35
36 public:
37     Delay(float ratio) {
38         m_threshold = u32(double(0xffffffffu) * ratio);
39     }
40
41     void delay(ureg& workUnits) {
42         for (;;) {
43             volatile ureg v = m_rand.next32();
44             workUnits++;
45             if (v <= m_threshold)
46                 break;
47         }
48     }
49 };
50
51 struct SharedState {
52     MapAdapter& adapter;
53     MapAdapter::Map* map;
54     ureg numKeysPerThread;
55     float delayFactor;
56     ureg numThreads;
57     ureg readsPerWrite;
58     ureg itersPerChunk;
59     turf::extra::SpinKicker spinKicker;
60     turf::Atomic<u32> doneFlag;
61
62     SharedState(MapAdapter& adapter, ureg numThreads, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
63         : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), numThreads(numThreads), readsPerWrite(readsPerWrite), itersPerChunk(itersPerChunk) {
64         delayFactor = 0.5f;
65         doneFlag.storeNonatomic(0);
66     }
67 };
68
69 class ThreadState {
70 public:
71     SharedState& m_shared;
72     MapAdapter::ThreadContext m_threadCtx;
73     ureg m_threadIndex;
74     u32 m_rangeLo;
75     u32 m_rangeHi;
76
77     u32 m_addIndex;
78     u32 m_removeIndex;
79
80     struct Stats {
81         ureg workUnitsDone;
82         ureg mapOpsDone;
83         double duration;
84
85         Stats() {
86             workUnitsDone = 0;
87             mapOpsDone = 0;
88             duration = 0;
89         }
90
91         Stats& operator+=(const Stats& other) {
92             workUnitsDone += other.workUnitsDone;
93             mapOpsDone += other.mapOpsDone;
94             duration += other.duration;
95             return *this;
96         }
97
98         bool operator<(const Stats& other) const {
99             return duration < other.duration;
100         }
101     };
102
103     Stats m_stats;
104
105     ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
106         m_threadIndex = threadIndex;
107         m_rangeLo = rangeLo;
108         m_rangeHi = rangeHi;
109         m_addIndex = rangeLo;
110         m_removeIndex = rangeLo;
111     }
112
113     void registerThread() {
114         m_threadCtx.registerThread();
115     }
116
117     void unregisterThread() {
118         m_threadCtx.unregisterThread();
119     }
120
121     void initialPopulate() {
122         TURF_ASSERT(m_addIndex == m_removeIndex);
123         MapAdapter::Map *map = m_shared.map;
124         for (ureg i = 0; i < m_shared.numKeysPerThread; i++) {
125             u32 key = m_addIndex * Prime;
126             map->insert(key, (void*) (key & ~uptr(3)));
127             if (++m_addIndex == m_rangeHi)
128                 m_addIndex = m_rangeLo;
129         }
130     }
131
132     void run() {
133         MapAdapter::Map *map = m_shared.map;
134         turf::CPUTimer::Converter converter;
135         Delay delay(m_shared.delayFactor);
136         Stats stats;
137         ureg lookupIndex = m_rangeLo;
138         ureg remaining = m_shared.itersPerChunk;
139         if (m_threadIndex == 0)
140             m_shared.spinKicker.kick(m_shared.numThreads - 1);
141         else {
142             remaining = ~u32(0);
143             m_shared.spinKicker.waitForKick();
144         }
145
146         // ---------
147         turf::CPUTimer::Point start = turf::CPUTimer::get();
148         for (; remaining > 0; remaining--) {
149             // Add
150             delay.delay(stats.workUnitsDone);
151             if (m_shared.doneFlag.load(turf::Relaxed))
152                 break;
153             u32 key = m_addIndex * Prime;
154             if (key >= 2) {
155                 map->insert(key, (void*) uptr(key));
156                 stats.mapOpsDone++;
157             }
158             if (++m_addIndex == m_rangeHi)
159                 m_addIndex = m_rangeLo;
160
161             // Lookup
162             if (s32(lookupIndex - m_removeIndex) < 0)
163                 lookupIndex = m_removeIndex;
164             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
165                 delay.delay(stats.workUnitsDone);
166                 if (m_shared.doneFlag.load(turf::Relaxed))
167                     break;
168                 key = lookupIndex * Prime;
169                 if (key >= 2) {
170                     volatile void* value = map->get(key);
171                     TURF_UNUSED(value);
172                     stats.mapOpsDone++;
173                 }
174                 if (++lookupIndex == m_rangeHi)
175                     lookupIndex = m_rangeLo;
176                 if (lookupIndex == m_addIndex)
177                     lookupIndex = m_removeIndex;
178             }
179
180             // Remove
181             delay.delay(stats.workUnitsDone);
182             if (m_shared.doneFlag.load(turf::Relaxed))
183                 break;
184             key = m_removeIndex * Prime;
185             if (key >= 2) {
186                 map->erase(key);
187                 stats.mapOpsDone++;
188             }
189             if (++m_removeIndex == m_rangeHi)
190                 m_removeIndex = m_rangeLo;
191
192             // Lookup
193             if (s32(lookupIndex - m_removeIndex) < 0)
194                 lookupIndex = m_removeIndex;
195             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
196                 delay.delay(stats.workUnitsDone);
197                 if (m_shared.doneFlag.load(turf::Relaxed))
198                     break;
199                 key = lookupIndex * Prime;
200                 if (key >= 2) {
201                     volatile void* value = map->get(key);
202                     TURF_UNUSED(value);
203                     stats.mapOpsDone++;
204                 }
205                 if (++lookupIndex == m_rangeHi)
206                     lookupIndex = m_rangeLo;
207                 if (lookupIndex == m_addIndex)
208                     lookupIndex = m_removeIndex;
209             }
210         }
211         if (m_threadIndex == 0)
212             m_shared.doneFlag.store(1, turf::Relaxed);
213         m_threadCtx.update();
214         turf::CPUTimer::Point end = turf::CPUTimer::get();
215         // ---------
216
217         stats.duration = converter.toSeconds(end - start);
218         m_stats = stats;
219     }
220 };
221
222 static const turf::extra::Option Options[] = {
223     { "readsPerWrite", 'r', true, "number of reads per write" },
224     { "itersPerChunk", 'i', true, "number of iterations per chunk" },
225     { "chunks", 'c', true, "number of chunks to execute" },
226     { "keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep" },
227 };
228
229 int main(int argc, const char** argv) {
230     turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
231     options.parse(argc, argv);
232     ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);
233     ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
234     ureg chunks = options.getInteger("chunks", DefaultChunks);
235     double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
236
237     turf::extra::JobDispatcher dispatcher;
238     ureg numThreads = dispatcher.getNumPhysicalCores();
239     MapAdapter adapter(numThreads);
240
241     // Create shared state and register threads
242     SharedState shared(adapter, numThreads, NumKeysPerThread, readsPerWrite, itersPerChunk);
243     std::vector<ThreadState> threads;
244     threads.reserve(numThreads);
245     for (ureg t = 0; t < numThreads; t++) {
246         u32 rangeLo = 0xffffffffu / numThreads * t + 1;
247         u32 rangeHi = 0xffffffffu / numThreads * (t + 1) + 1;
248         threads.emplace_back(shared, t, rangeLo, rangeHi);
249     }
250     dispatcher.kickMulti(&ThreadState::registerThread, &threads[0], threads.size());
251
252     {
253         // Create the map
254         MapAdapter::Map map(MapAdapter::getInitialCapacity(numThreads * NumKeysPerThread));
255         shared.map = &map;
256         dispatcher.kickMulti(&ThreadState::initialPopulate, &threads[0], threads.size());
257
258         printf("{\n");
259         printf("'mapType': '%s',\n", MapAdapter::MapName);
260         printf("'readsPerWrite': %d,\n", (int) readsPerWrite);
261         printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
262         printf("'chunks': %d,\n", (int) chunks);
263         printf("'keepChunkFraction': %f,\n", keepChunkFraction);
264         printf("'labels': ('delayFactor', 'workUnitsDone', 'mapOpsDone', 'totalTime'),\n"),
265         printf("'points': [\n");
266         for (float delayFactor = 1.f; delayFactor >= 0.0005f; delayFactor *= 0.95f)
267         {
268             shared.delayFactor = delayFactor;
269
270             std::vector<ThreadState::Stats> kickTotals;
271             for (ureg c = 0; c < chunks; c++) {
272                 shared.doneFlag.storeNonatomic(false);
273                 dispatcher.kickMulti(&ThreadState::run, &threads[0], threads.size());
274
275                 ThreadState::Stats kickTotal;
276                 for (ureg t = 0; t < numThreads; t++)
277                     kickTotal += threads[t].m_stats;
278                 kickTotals.push_back(kickTotal);
279             }
280
281             std::sort(kickTotals.begin(), kickTotals.end());
282             ThreadState::Stats totals;
283             for (ureg t = 0; t < ureg(kickTotals.size() * keepChunkFraction); t++) {
284                 totals += kickTotals[t];
285             }
286
287             printf("    (%f, %d, %d, %f),\n",
288                 shared.delayFactor,
289                 int(totals.workUnitsDone),
290                 int(totals.mapOpsDone),
291                 totals.duration);
292         }
293         printf("],\n");
294         printf("}\n");
295
296         shared.map = NULL;
297     }
298
299     dispatcher.kickMulti(&ThreadState::unregisterThread, &threads[0], threads.size());
300     return 0;
301 }