Changes libcds library path
[junction.git] / samples / MapPerformanceTests / MapPerformanceTests.cpp
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #include <junction/Core.h>
14 #include <turf/CPUTimer.h>
15 #include <turf/Util.h>
16 #include <turf/extra/UniqueSequence.h>
17 #include <turf/extra/JobDispatcher.h>
18 #include <turf/extra/Options.h>
19 #include <junction/extra/MapAdapter.h>
20 #include <algorithm>
21 #include <vector>
22 #include <stdio.h>
23
24 using namespace turf::intTypes;
25 typedef junction::extra::MapAdapter MapAdapter;
26
27 static const ureg NumKeysPerThread = 16384;
28 static const ureg DefaultReadsPerWrite = 19;
29 static const ureg DefaultItersPerChunk = 128;
30 static const ureg DefaultChunks = 10;
31 static const u32 Prime = 0x4190ab09;
32
33 class Delay {
34 private:
35     turf::extra::Random m_rand;
36     u32 m_threshold;
37
38 public:
39     Delay(float ratio) {
40         m_threshold = u32(double(0xffffffffu) * ratio);
41     }
42
43     void delay(ureg& workUnits) {
44         for (;;) {
45             volatile ureg v = m_rand.next32();
46             workUnits++;
47             if (v <= m_threshold)
48                 break;
49         }
50     }
51 };
52
53 struct SharedState {
54     MapAdapter& adapter;
55     MapAdapter::Map* map;
56     ureg numKeysPerThread;
57     float delayFactor;
58     ureg numThreads;
59     ureg readsPerWrite;
60     ureg itersPerChunk;
61     turf::extra::SpinKicker spinKicker;
62     turf::Atomic<u32> doneFlag;
63
64     SharedState(MapAdapter& adapter, ureg numThreads, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
65         : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), numThreads(numThreads), readsPerWrite(readsPerWrite),
66           itersPerChunk(itersPerChunk) {
67         delayFactor = 0.5f;
68         doneFlag.storeNonatomic(0);
69     }
70 };
71
72 class ThreadState {
73 public:
74     SharedState* m_shared;
75     MapAdapter::ThreadContext m_threadCtx;
76     ureg m_threadIndex;
77     u32 m_rangeLo;
78     u32 m_rangeHi;
79
80     u32 m_addIndex;
81     u32 m_removeIndex;
82
83     struct Stats {
84         ureg workUnitsDone;
85         ureg mapOpsDone;
86         double duration;
87
88         Stats() {
89             workUnitsDone = 0;
90             mapOpsDone = 0;
91             duration = 0;
92         }
93
94         Stats& operator+=(const Stats& other) {
95             workUnitsDone += other.workUnitsDone;
96             mapOpsDone += other.mapOpsDone;
97             duration += other.duration;
98             return *this;
99         }
100
101         bool operator<(const Stats& other) const {
102             return duration < other.duration;
103         }
104     };
105
106     Stats m_stats;
107
108     ThreadState(SharedState* shared, ureg threadIndex, u32 rangeLo, u32 rangeHi)
109         : m_shared(shared), m_threadCtx(shared->adapter, threadIndex) {
110         m_threadIndex = threadIndex;
111         m_rangeLo = rangeLo;
112         m_rangeHi = rangeHi;
113         m_addIndex = rangeLo;
114         m_removeIndex = rangeLo;
115     }
116
117     void registerThread() {
118         m_threadCtx.registerThread();
119     }
120
121     void unregisterThread() {
122         m_threadCtx.unregisterThread();
123     }
124
125     void initialPopulate() {
126         TURF_ASSERT(m_addIndex == m_removeIndex);
127         MapAdapter::Map* map = m_shared->map;
128         for (ureg i = 0; i < m_shared->numKeysPerThread; i++) {
129             u32 key = m_addIndex * Prime;
130             map->assign(key, (void*) (key & ~uptr(3)));
131             if (++m_addIndex == m_rangeHi)
132                 m_addIndex = m_rangeLo;
133         }
134     }
135
136     void run() {
137         MapAdapter::Map* map = m_shared->map;
138         turf::CPUTimer::Converter converter;
139         Delay delay(m_shared->delayFactor);
140         Stats stats;
141         ureg lookupIndex = m_rangeLo;
142         ureg remaining = m_shared->itersPerChunk;
143         if (m_threadIndex == 0)
144             m_shared->spinKicker.kick(m_shared->numThreads - 1);
145         else {
146             remaining = ~u32(0);
147             m_shared->spinKicker.waitForKick();
148         }
149
150         // ---------
151         turf::CPUTimer::Point start = turf::CPUTimer::get();
152         for (; remaining > 0; remaining--) {
153             // Add
154             delay.delay(stats.workUnitsDone);
155             if (m_shared->doneFlag.load(turf::Relaxed))
156                 break;
157             u32 key = m_addIndex * Prime;
158             if (key >= 2) {
159                 map->assign(key, (void*) uptr(key));
160                 stats.mapOpsDone++;
161             }
162             if (++m_addIndex == m_rangeHi)
163                 m_addIndex = m_rangeLo;
164
165             // Lookup
166             if (s32(lookupIndex - m_removeIndex) < 0)
167                 lookupIndex = m_removeIndex;
168             for (ureg l = 0; l < m_shared->readsPerWrite; l++) {
169                 delay.delay(stats.workUnitsDone);
170                 if (m_shared->doneFlag.load(turf::Relaxed))
171                     break;
172                 key = lookupIndex * Prime;
173                 if (key >= 2) {
174                     volatile void* value = map->get(key);
175                     TURF_UNUSED(value);
176                     stats.mapOpsDone++;
177                 }
178                 if (++lookupIndex == m_rangeHi)
179                     lookupIndex = m_rangeLo;
180                 if (lookupIndex == m_addIndex)
181                     lookupIndex = m_removeIndex;
182             }
183
184             // Remove
185             delay.delay(stats.workUnitsDone);
186             if (m_shared->doneFlag.load(turf::Relaxed))
187                 break;
188             key = m_removeIndex * Prime;
189             if (key >= 2) {
190                 map->erase(key);
191                 stats.mapOpsDone++;
192             }
193             if (++m_removeIndex == m_rangeHi)
194                 m_removeIndex = m_rangeLo;
195
196             // Lookup
197             if (s32(lookupIndex - m_removeIndex) < 0)
198                 lookupIndex = m_removeIndex;
199             for (ureg l = 0; l < m_shared->readsPerWrite; l++) {
200                 delay.delay(stats.workUnitsDone);
201                 if (m_shared->doneFlag.load(turf::Relaxed))
202                     break;
203                 key = lookupIndex * Prime;
204                 if (key >= 2) {
205                     volatile void* value = map->get(key);
206                     TURF_UNUSED(value);
207                     stats.mapOpsDone++;
208                 }
209                 if (++lookupIndex == m_rangeHi)
210                     lookupIndex = m_rangeLo;
211                 if (lookupIndex == m_addIndex)
212                     lookupIndex = m_removeIndex;
213             }
214         }
215         if (m_threadIndex == 0)
216             m_shared->doneFlag.store(1, turf::Relaxed);
217         m_threadCtx.update();
218         turf::CPUTimer::Point end = turf::CPUTimer::get();
219         // ---------
220
221         stats.duration = converter.toSeconds(end - start);
222         m_stats = stats;
223     }
224 };
225
226 static const turf::extra::Option Options[] = {
227     {"readsPerWrite", 'r', true, "number of reads per write"},
228     {"itersPerChunk", 'i', true, "number of iterations per chunk"},
229     {"chunks", 'c', true, "number of chunks to execute"},
230     {"keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep"},
231 };
232
233 int main(int argc, const char** argv) {
234     turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
235     options.parse(argc, argv);
236     ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);
237     ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
238     ureg chunks = options.getInteger("chunks", DefaultChunks);
239     double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
240
241     turf::extra::JobDispatcher dispatcher;
242     ureg numThreads = dispatcher.getNumPhysicalCores();
243     MapAdapter adapter(numThreads);
244
245     // Create shared state and register threads
246     SharedState shared(adapter, numThreads, NumKeysPerThread, readsPerWrite, itersPerChunk);
247     std::vector<ThreadState> threads;
248     threads.reserve(numThreads);
249     for (ureg t = 0; t < numThreads; t++) {
250         u32 rangeLo = 0xffffffffu / numThreads * t + 1;
251         u32 rangeHi = 0xffffffffu / numThreads * (t + 1) + 1;
252         threads.push_back(ThreadState(&shared, t, rangeLo, rangeHi));
253     }
254     dispatcher.kickMulti(&ThreadState::registerThread, &threads[0], threads.size());
255
256     {
257         // Create the map
258         MapAdapter::Map map(MapAdapter::getInitialCapacity(numThreads * NumKeysPerThread));
259         shared.map = &map;
260         dispatcher.kickMulti(&ThreadState::initialPopulate, &threads[0], threads.size());
261
262         printf("{\n");
263         printf("'mapType': '%s',\n", MapAdapter::getMapName());
264         printf("'readsPerWrite': %d,\n", (int) readsPerWrite);
265         printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
266         printf("'chunks': %d,\n", (int) chunks);
267         printf("'keepChunkFraction': %f,\n", keepChunkFraction);
268         printf("'labels': ('delayFactor', 'workUnitsDone', 'mapOpsDone', 'totalTime'),\n"), printf("'points': [\n");
269         for (float delayFactor = 1.f; delayFactor >= 0.0005f; delayFactor *= 0.95f) {
270             shared.delayFactor = delayFactor;
271
272             std::vector<ThreadState::Stats> kickTotals;
273             for (ureg c = 0; c < chunks; c++) {
274                 shared.doneFlag.storeNonatomic(false);
275                 dispatcher.kickMulti(&ThreadState::run, &threads[0], threads.size());
276
277                 ThreadState::Stats kickTotal;
278                 for (ureg t = 0; t < numThreads; t++)
279                     kickTotal += threads[t].m_stats;
280                 kickTotals.push_back(kickTotal);
281             }
282
283             std::sort(kickTotals.begin(), kickTotals.end());
284             ThreadState::Stats totals;
285             for (ureg t = 0; t < ureg(kickTotals.size() * keepChunkFraction); t++) {
286                 totals += kickTotals[t];
287             }
288
289             printf("    (%f, %d, %d, %f),\n", shared.delayFactor, int(totals.workUnitsDone), int(totals.mapOpsDone),
290                    totals.duration);
291         }
292         printf("],\n");
293         printf("}\n");
294
295         shared.map = NULL;
296     }
297
298     dispatcher.kickMulti(&ThreadState::unregisterThread, &threads[0], threads.size());
299     return 0;
300 }