Delete unused junction/Averager.h
[junction.git] / samples / MapPerformanceTests / MapPerformanceTests.cpp
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #include <junction/Core.h>
14 #include <turf/CPUTimer.h>
15 #include <turf/Util.h>
16 #include <turf/extra/UniqueSequence.h>
17 #include <turf/extra/JobDispatcher.h>
18 #include <turf/extra/Options.h>
19 #include <junction/extra/MapAdapter.h>
20 #include <algorithm>
21 #include <vector>
22
23 using namespace turf::intTypes;
24 typedef junction::extra::MapAdapter MapAdapter;
25
26 static const ureg NumKeysPerThread = 16384;
27 static const ureg DefaultReadsPerWrite = 19;
28 static const ureg DefaultItersPerChunk = 128;
29 static const ureg DefaultChunks = 10;
30 static const u32 Prime = 0x4190ab09;
31
32 class Delay {
33 private:
34     turf::extra::Random m_rand;
35     u32 m_threshold;
36
37 public:
38     Delay(float ratio) {
39         m_threshold = u32(double(0xffffffffu) * ratio);
40     }
41
42     void delay(ureg& workUnits) {
43         for (;;) {
44             volatile ureg v = m_rand.next32();
45             workUnits++;
46             if (v <= m_threshold)
47                 break;
48         }
49     }
50 };
51
52 struct SharedState {
53     MapAdapter& adapter;
54     MapAdapter::Map* map;
55     ureg numKeysPerThread;
56     float delayFactor;
57     ureg numThreads;
58     ureg readsPerWrite;
59     ureg itersPerChunk;
60     turf::extra::SpinKicker spinKicker;
61     turf::Atomic<u32> doneFlag;
62
63     SharedState(MapAdapter& adapter, ureg numThreads, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
64         : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), numThreads(numThreads), readsPerWrite(readsPerWrite), itersPerChunk(itersPerChunk) {
65         delayFactor = 0.5f;
66         doneFlag.storeNonatomic(0);
67     }
68 };
69
70 class ThreadState {
71 public:
72     SharedState& m_shared;
73     MapAdapter::ThreadContext m_threadCtx;
74     ureg m_threadIndex;
75     u32 m_rangeLo;
76     u32 m_rangeHi;
77
78     u32 m_addIndex;
79     u32 m_removeIndex;
80
81     struct Stats {
82         ureg workUnitsDone;
83         ureg mapOpsDone;
84         double duration;
85
86         Stats() {
87             workUnitsDone = 0;
88             mapOpsDone = 0;
89             duration = 0;
90         }
91
92         Stats& operator+=(const Stats& other) {
93             workUnitsDone += other.workUnitsDone;
94             mapOpsDone += other.mapOpsDone;
95             duration += other.duration;
96             return *this;
97         }
98
99         bool operator<(const Stats& other) const {
100             return duration < other.duration;
101         }
102     };
103
104     Stats m_stats;
105
106     ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
107         m_threadIndex = threadIndex;
108         m_rangeLo = rangeLo;
109         m_rangeHi = rangeHi;
110         m_addIndex = rangeLo;
111         m_removeIndex = rangeLo;
112     }
113
114     void registerThread() {
115         m_threadCtx.registerThread();
116     }
117
118     void unregisterThread() {
119         m_threadCtx.unregisterThread();
120     }
121
122     void initialPopulate() {
123         TURF_ASSERT(m_addIndex == m_removeIndex);
124         MapAdapter::Map *map = m_shared.map;
125         for (ureg i = 0; i < m_shared.numKeysPerThread; i++) {
126             u32 key = m_addIndex * Prime;
127             map->insert(key, (void*) (key & ~uptr(3)));
128             if (++m_addIndex == m_rangeHi)
129                 m_addIndex = m_rangeLo;
130         }
131     }
132
133     void run() {
134         MapAdapter::Map *map = m_shared.map;
135         turf::CPUTimer::Converter converter;
136         Delay delay(m_shared.delayFactor);
137         Stats stats;
138         ureg lookupIndex = m_rangeLo;
139         ureg remaining = m_shared.itersPerChunk;
140         if (m_threadIndex == 0)
141             m_shared.spinKicker.kick(m_shared.numThreads - 1);
142         else {
143             remaining = ~u32(0);
144             m_shared.spinKicker.waitForKick();
145         }
146
147         // ---------
148         turf::CPUTimer::Point start = turf::CPUTimer::get();
149         for (; remaining > 0; remaining--) {
150             // Add
151             delay.delay(stats.workUnitsDone);
152             if (m_shared.doneFlag.load(turf::Relaxed))
153                 break;
154             u32 key = m_addIndex * Prime;
155             if (key >= 2) {
156                 map->insert(key, (void*) uptr(key));
157                 stats.mapOpsDone++;
158             }
159             if (++m_addIndex == m_rangeHi)
160                 m_addIndex = m_rangeLo;
161
162             // Lookup
163             if (s32(lookupIndex - m_removeIndex) < 0)
164                 lookupIndex = m_removeIndex;
165             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
166                 delay.delay(stats.workUnitsDone);
167                 if (m_shared.doneFlag.load(turf::Relaxed))
168                     break;
169                 key = lookupIndex * Prime;
170                 if (key >= 2) {
171                     volatile void* value = map->get(key);
172                     TURF_UNUSED(value);
173                     stats.mapOpsDone++;
174                 }
175                 if (++lookupIndex == m_rangeHi)
176                     lookupIndex = m_rangeLo;
177                 if (lookupIndex == m_addIndex)
178                     lookupIndex = m_removeIndex;
179             }
180
181             // Remove
182             delay.delay(stats.workUnitsDone);
183             if (m_shared.doneFlag.load(turf::Relaxed))
184                 break;
185             key = m_removeIndex * Prime;
186             if (key >= 2) {
187                 map->erase(key);
188                 stats.mapOpsDone++;
189             }
190             if (++m_removeIndex == m_rangeHi)
191                 m_removeIndex = m_rangeLo;
192
193             // Lookup
194             if (s32(lookupIndex - m_removeIndex) < 0)
195                 lookupIndex = m_removeIndex;
196             for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
197                 delay.delay(stats.workUnitsDone);
198                 if (m_shared.doneFlag.load(turf::Relaxed))
199                     break;
200                 key = lookupIndex * Prime;
201                 if (key >= 2) {
202                     volatile void* value = map->get(key);
203                     TURF_UNUSED(value);
204                     stats.mapOpsDone++;
205                 }
206                 if (++lookupIndex == m_rangeHi)
207                     lookupIndex = m_rangeLo;
208                 if (lookupIndex == m_addIndex)
209                     lookupIndex = m_removeIndex;
210             }
211         }
212         if (m_threadIndex == 0)
213             m_shared.doneFlag.store(1, turf::Relaxed);
214         m_threadCtx.update();
215         turf::CPUTimer::Point end = turf::CPUTimer::get();
216         // ---------
217
218         stats.duration = converter.toSeconds(end - start);
219         m_stats = stats;
220     }
221 };
222
223 static const turf::extra::Option Options[] = {
224     { "readsPerWrite", 'r', true, "number of reads per write" },
225     { "itersPerChunk", 'i', true, "number of iterations per chunk" },
226     { "chunks", 'c', true, "number of chunks to execute" },
227     { "keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep" },
228 };
229
230 int main(int argc, const char** argv) {
231     turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
232     options.parse(argc, argv);
233     ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);
234     ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
235     ureg chunks = options.getInteger("chunks", DefaultChunks);
236     double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
237
238     turf::extra::JobDispatcher dispatcher;
239     ureg numThreads = dispatcher.getNumPhysicalCores();
240     MapAdapter adapter(numThreads);
241
242     // Create shared state and register threads
243     SharedState shared(adapter, numThreads, NumKeysPerThread, readsPerWrite, itersPerChunk);
244     std::vector<ThreadState> threads;
245     threads.reserve(numThreads);
246     for (ureg t = 0; t < numThreads; t++) {
247         u32 rangeLo = 0xffffffffu / numThreads * t + 1;
248         u32 rangeHi = 0xffffffffu / numThreads * (t + 1) + 1;
249         threads.emplace_back(shared, t, rangeLo, rangeHi);
250     }
251     dispatcher.kickMulti(&ThreadState::registerThread, &threads[0], threads.size());
252
253     {
254         // Create the map
255         MapAdapter::Map map(MapAdapter::getInitialCapacity(numThreads * NumKeysPerThread));
256         shared.map = &map;
257         dispatcher.kickMulti(&ThreadState::initialPopulate, &threads[0], threads.size());
258
259         printf("{\n");
260         printf("'mapType': '%s',\n", MapAdapter::MapName);
261         printf("'readsPerWrite': %d,\n", (int) readsPerWrite);
262         printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
263         printf("'chunks': %d,\n", (int) chunks);
264         printf("'keepChunkFraction': %f,\n", keepChunkFraction);
265         printf("'labels': ('delayFactor', 'workUnitsDone', 'mapOpsDone', 'totalTime'),\n"),
266         printf("'points': [\n");
267         for (float delayFactor = 1.f; delayFactor >= 0.0005f; delayFactor *= 0.95f)
268         {
269             shared.delayFactor = delayFactor;
270
271             std::vector<ThreadState::Stats> kickTotals;
272             for (ureg c = 0; c < chunks; c++) {
273                 shared.doneFlag.storeNonatomic(false);
274                 dispatcher.kickMulti(&ThreadState::run, &threads[0], threads.size());
275
276                 ThreadState::Stats kickTotal;
277                 for (ureg t = 0; t < numThreads; t++)
278                     kickTotal += threads[t].m_stats;
279                 kickTotals.push_back(kickTotal);
280             }
281
282             std::sort(kickTotals.begin(), kickTotals.end());
283             ThreadState::Stats totals;
284             for (ureg t = 0; t < ureg(kickTotals.size() * keepChunkFraction); t++) {
285                 totals += kickTotals[t];
286             }
287
288             printf("    (%f, %d, %d, %f),\n",
289                 shared.delayFactor,
290                 int(totals.workUnitsDone),
291                 int(totals.mapOpsDone),
292                 totals.duration);
293         }
294         printf("],\n");
295         printf("}\n");
296
297         shared.map = NULL;
298     }
299
300     dispatcher.kickMulti(&ThreadState::unregisterThread, &threads[0], threads.size());
301     return 0;
302 }