1 /*------------------------------------------------------------------------
2 Junction: Concurrent data structures in C++
3 Copyright (c) 2016 Jeff Preshing
5 Distributed under the Simplified BSD License.
6 Original location: https://github.com/preshing/junction
8 This software is distributed WITHOUT ANY WARRANTY; without even the
9 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10 See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
13 #include <junction/Core.h>
14 #include <turf/CPUTimer.h>
15 #include <turf/Util.h>
16 #include <turf/extra/UniqueSequence.h>
17 #include <turf/extra/JobDispatcher.h>
18 #include <junction/Averager.h>
19 #include <turf/extra/Options.h>
20 #include <junction/extra/MapAdapter.h>
22 using namespace turf::intTypes;
23 typedef junction::extra::MapAdapter MapAdapter;
25 static const ureg NumKeysPerThread = 16384;
26 static const ureg DefaultReadsPerWrite = 19;
27 static const ureg DefaultItersPerChunk = 128;
28 static const ureg DefaultChunks = 10;
29 static const u32 Prime = 0x4190ab09;
33 turf::extra::Random m_rand;
38 m_threshold = u32(double(0xffffffffu) * ratio);
41 void delay(ureg& workUnits) {
43 volatile ureg v = m_rand.next32();
54 ureg numKeysPerThread;
59 turf::extra::SpinKicker spinKicker;
60 turf::Atomic<u32> doneFlag;
62 SharedState(MapAdapter& adapter, ureg numThreads, ureg numKeysPerThread, ureg readsPerWrite, ureg itersPerChunk)
63 : adapter(adapter), map(NULL), numKeysPerThread(numKeysPerThread), numThreads(numThreads), readsPerWrite(readsPerWrite), itersPerChunk(itersPerChunk) {
65 doneFlag.storeNonatomic(0);
71 SharedState& m_shared;
72 MapAdapter::ThreadContext m_threadCtx;
91 Stats& operator+=(const Stats& other) {
92 workUnitsDone += other.workUnitsDone;
93 mapOpsDone += other.mapOpsDone;
94 duration += other.duration;
98 bool operator<(const Stats& other) const {
99 return duration < other.duration;
105 ThreadState(SharedState& shared, ureg threadIndex, u32 rangeLo, u32 rangeHi) : m_shared(shared), m_threadCtx(shared.adapter, threadIndex) {
106 m_threadIndex = threadIndex;
109 m_addIndex = rangeLo;
110 m_removeIndex = rangeLo;
113 void registerThread() {
114 m_threadCtx.registerThread();
117 void unregisterThread() {
118 m_threadCtx.unregisterThread();
121 void initialPopulate() {
122 TURF_ASSERT(m_addIndex == m_removeIndex);
123 MapAdapter::Map *map = m_shared.map;
124 for (ureg i = 0; i < m_shared.numKeysPerThread; i++) {
125 u32 key = m_addIndex * Prime;
126 map->insert(key, (void*) (key & ~uptr(3)));
127 if (++m_addIndex == m_rangeHi)
128 m_addIndex = m_rangeLo;
133 MapAdapter::Map *map = m_shared.map;
134 turf::CPUTimer::Converter converter;
135 Delay delay(m_shared.delayFactor);
137 ureg lookupIndex = m_rangeLo;
138 ureg remaining = m_shared.itersPerChunk;
139 if (m_threadIndex == 0)
140 m_shared.spinKicker.kick(m_shared.numThreads - 1);
143 m_shared.spinKicker.waitForKick();
147 turf::CPUTimer::Point start = turf::CPUTimer::get();
148 for (; remaining > 0; remaining--) {
150 delay.delay(stats.workUnitsDone);
151 if (m_shared.doneFlag.load(turf::Relaxed))
153 u32 key = m_addIndex * Prime;
155 map->insert(key, (void*) uptr(key));
158 if (++m_addIndex == m_rangeHi)
159 m_addIndex = m_rangeLo;
162 if (s32(lookupIndex - m_removeIndex) < 0)
163 lookupIndex = m_removeIndex;
164 for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
165 delay.delay(stats.workUnitsDone);
166 if (m_shared.doneFlag.load(turf::Relaxed))
168 key = lookupIndex * Prime;
170 volatile void* value = map->get(key);
174 if (++lookupIndex == m_rangeHi)
175 lookupIndex = m_rangeLo;
176 if (lookupIndex == m_addIndex)
177 lookupIndex = m_removeIndex;
181 delay.delay(stats.workUnitsDone);
182 if (m_shared.doneFlag.load(turf::Relaxed))
184 key = m_removeIndex * Prime;
189 if (++m_removeIndex == m_rangeHi)
190 m_removeIndex = m_rangeLo;
193 if (s32(lookupIndex - m_removeIndex) < 0)
194 lookupIndex = m_removeIndex;
195 for (ureg l = 0; l < m_shared.readsPerWrite; l++) {
196 delay.delay(stats.workUnitsDone);
197 if (m_shared.doneFlag.load(turf::Relaxed))
199 key = lookupIndex * Prime;
201 volatile void* value = map->get(key);
205 if (++lookupIndex == m_rangeHi)
206 lookupIndex = m_rangeLo;
207 if (lookupIndex == m_addIndex)
208 lookupIndex = m_removeIndex;
211 if (m_threadIndex == 0)
212 m_shared.doneFlag.store(1, turf::Relaxed);
213 m_threadCtx.update();
214 turf::CPUTimer::Point end = turf::CPUTimer::get();
217 stats.duration = converter.toSeconds(end - start);
222 static const turf::extra::Option Options[] = {
223 { "readsPerWrite", 'r', true, "number of reads per write" },
224 { "itersPerChunk", 'i', true, "number of iterations per chunk" },
225 { "chunks", 'c', true, "number of chunks to execute" },
226 { "keepChunkFraction", 'k', true, "threshold fraction of chunk timings to keep" },
229 int main(int argc, const char** argv) {
230 turf::extra::Options options(Options, TURF_STATIC_ARRAY_SIZE(Options));
231 options.parse(argc, argv);
232 ureg readsPerWrite = options.getInteger("readsPerWrite", DefaultReadsPerWrite);
233 ureg itersPerChunk = options.getInteger("itersPerChunk", DefaultItersPerChunk);
234 ureg chunks = options.getInteger("chunks", DefaultChunks);
235 double keepChunkFraction = options.getDouble("keepChunkFraction", 1.0);
237 turf::extra::JobDispatcher dispatcher;
238 ureg numThreads = dispatcher.getNumPhysicalCores();
239 MapAdapter adapter(numThreads);
241 // Create shared state and register threads
242 SharedState shared(adapter, numThreads, NumKeysPerThread, readsPerWrite, itersPerChunk);
243 std::vector<ThreadState> threads;
244 threads.reserve(numThreads);
245 for (ureg t = 0; t < numThreads; t++) {
246 u32 rangeLo = 0xffffffffu / numThreads * t + 1;
247 u32 rangeHi = 0xffffffffu / numThreads * (t + 1) + 1;
248 threads.emplace_back(shared, t, rangeLo, rangeHi);
250 dispatcher.kickMulti(&ThreadState::registerThread, &threads[0], threads.size());
254 MapAdapter::Map map(MapAdapter::getInitialCapacity(numThreads * NumKeysPerThread));
256 dispatcher.kickMulti(&ThreadState::initialPopulate, &threads[0], threads.size());
259 printf("'mapType': '%s',\n", MapAdapter::MapName);
260 printf("'readsPerWrite': %d,\n", (int) readsPerWrite);
261 printf("'itersPerChunk': %d,\n", (int) itersPerChunk);
262 printf("'chunks': %d,\n", (int) chunks);
263 printf("'keepChunkFraction': %f,\n", keepChunkFraction);
264 printf("'labels': ('delayFactor', 'workUnitsDone', 'mapOpsDone', 'totalTime'),\n"),
265 printf("'points': [\n");
266 for (float delayFactor = 1.f; delayFactor >= 0.0005f; delayFactor *= 0.95f)
268 shared.delayFactor = delayFactor;
270 std::vector<ThreadState::Stats> kickTotals;
271 for (ureg c = 0; c < chunks; c++) {
272 shared.doneFlag.storeNonatomic(false);
273 dispatcher.kickMulti(&ThreadState::run, &threads[0], threads.size());
275 ThreadState::Stats kickTotal;
276 for (ureg t = 0; t < numThreads; t++)
277 kickTotal += threads[t].m_stats;
278 kickTotals.push_back(kickTotal);
281 std::sort(kickTotals.begin(), kickTotals.end());
282 ThreadState::Stats totals;
283 for (ureg t = 0; t < ureg(kickTotals.size() * keepChunkFraction); t++) {
284 totals += kickTotals[t];
287 printf(" (%f, %d, %d, %f),\n",
289 int(totals.workUnitsDone),
290 int(totals.mapOpsDone),
299 dispatcher.kickMulti(&ThreadState::unregisterThread, &threads[0], threads.size());