1 /*------------------------------------------------------------------------
2 Junction: Concurrent data structures in C++
3 Copyright (c) 2016 Jeff Preshing
5 Distributed under the Simplified BSD License.
6 Original location: https://github.com/preshing/junction
8 This software is distributed WITHOUT ANY WARRANTY; without even the
9 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10 See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
13 #ifndef JUNCTION_DETAILS_LINEAR_H
14 #define JUNCTION_DETAILS_LINEAR_H
16 #include <junction/Core.h>
17 #include <turf/Atomic.h>
18 #include <turf/Mutex.h>
19 #include <turf/ManualResetEvent.h>
20 #include <turf/Util.h>
21 #include <junction/MapTraits.h>
22 #include <turf/Trace.h>
23 #include <turf/Heap.h>
24 #include <junction/SimpleJobCoordinator.h>
25 #include <junction/QSBR.h>
30 TURF_TRACE_DECLARE(Linear, 22)
34 typedef typename Map::Hash Hash;
35 typedef typename Map::Value Value;
36 typedef typename Map::KeyTraits KeyTraits;
37 typedef typename Map::ValueTraits ValueTraits;
39 static const ureg InitialSize = 8;
40 static const ureg TableMigrationUnitSize = 32;
41 static const ureg LinearSearchLimit = 128;
42 static const ureg CellsInUseSample = LinearSearchLimit;
43 TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
44 TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
47 turf::Atomic<Hash> hash;
48 turf::Atomic<Value> value;
52 const ureg sizeMask; // a power of two minus one
53 const ureg limitNumValues;
54 turf::Atomic<sreg> cellsRemaining;
55 turf::Atomic<sreg> valuesRemaining;
56 turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
57 SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
59 Table(ureg sizeMask, ureg limitNumValues) : sizeMask(sizeMask), limitNumValues(limitNumValues),
60 cellsRemaining(limitNumValues), valuesRemaining(limitNumValues) {
63 static Table* create(ureg tableSize, ureg limitNumValues) {
64 TURF_ASSERT(turf::util::isPowerOf2(tableSize));
65 Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
66 new(table) Table(tableSize - 1, limitNumValues);
67 for (ureg j = 0; j < tableSize; j++) {
68 table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
69 table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
75 this->Table::~Table();
79 Cell* getCells() const {
80 return (Cell*) (this + 1);
83 ureg getNumMigrationUnits() const {
84 return sizeMask / TableMigrationUnitSize + 1;
88 class TableMigration : public SimpleJobCoordinator::Job {
92 turf::Atomic<ureg> m_sourceIndex;
94 turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
95 turf::Atomic<sreg> m_unitsRemaining;
97 TableMigration(Map& map) : m_map(map), m_sourceIndex(0), m_workerStatus(0), m_unitsRemaining(0) {
98 // Caller is responsible for filling in source & destination
101 virtual ~TableMigration() TURF_OVERRIDE {
102 // Destroy source table.
110 bool migrateRange(ureg startIdx);
111 virtual void run() TURF_OVERRIDE;
114 static Cell* find(Hash hash, Table* table) {
115 TURF_TRACE(Linear, 0, "[find] called", uptr(table), hash);
117 TURF_ASSERT(hash != KeyTraits::NullHash);
118 ureg sizeMask = table->sizeMask;
119 for (ureg idx = hash;; idx++) {
121 Cell* cell = table->getCells() + idx;
122 // Load the hash that was there.
123 uptr probeHash = cell->hash.load(turf::Relaxed);
124 if (probeHash == hash) {
125 TURF_TRACE(Linear, 1, "[find] found existing cell", uptr(table), idx);
127 } else if (probeHash == KeyTraits::NullHash) {
133 // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
135 InsertResult_AlreadyFound,
136 InsertResult_InsertedNew,
137 InsertResult_Overflow
139 static InsertResult insert(Hash hash, Table* table, Cell*& cell) {
140 TURF_TRACE(Linear, 2, "[insert] called", uptr(table), hash);
142 TURF_ASSERT(hash != KeyTraits::NullHash);
143 ureg sizeMask = table->sizeMask;
145 for (ureg idx = hash;; idx++) {
147 cell = table->getCells() + idx;
148 // Load the existing hash.
149 uptr probeHash = cell->hash.load(turf::Relaxed);
150 if (probeHash == hash) {
151 TURF_TRACE(Linear, 3, "[insert] found existing cell", uptr(table), idx);
152 return InsertResult_AlreadyFound; // Key found in table. Return the existing cell.
154 if (probeHash == KeyTraits::NullHash) {
155 // It's an empty cell. Try to reserve it.
156 // But first, decrement cellsRemaining to ensure we have permission to create new getCells().
157 s32 prevCellsRemaining = table->cellsRemaining.fetchSub(1, turf::Relaxed);
158 if (prevCellsRemaining <= 0) {
159 // Table is overpopulated.
160 TURF_TRACE(Linear, 4, "[insert] ran out of cellsRemaining", prevCellsRemaining, 0);
161 table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
162 return InsertResult_Overflow;
164 // Try to reserve this cell.
165 uptr prevHash = cell->hash.compareExchange(KeyTraits::NullHash, hash, turf::Relaxed);
166 if (prevHash == KeyTraits::NullHash) {
167 // Success. We reserved a new cell.
168 TURF_TRACE(Linear, 5, "[insert] reserved cell", prevCellsRemaining, idx);
169 return InsertResult_InsertedNew;
171 // There was a race and another thread reserved that cell from under us.
172 TURF_TRACE(Linear, 6, "[insert] detected race to reserve cell", ureg(hash), idx);
173 table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
174 if (prevHash == hash) {
175 TURF_TRACE(Linear, 7, "[insert] race reserved same hash", ureg(hash), idx);
176 return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell.
179 // Try again in the next cell.
183 static void beginTableMigration(Map& map, Table* table) {
184 // Create new migration by DCLI.
185 TURF_TRACE(Linear, 8, "[beginTableMigration] called", 0, 0);
186 SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
188 TURF_TRACE(Linear, 9, "[beginTableMigration] new migration already exists", 0, 0);
190 turf::LockGuard<turf::Mutex> guard(table->mutex);
191 job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
193 TURF_TRACE(Linear, 10, "[beginTableMigration] new migration already exists (double-checked)", 0, 0);
195 // Determine new migration size and cap the number of values that can be added concurrent to the migration.
196 sreg oldValuesLimit = table->limitNumValues;
197 sreg oldValuesRemaining = table->valuesRemaining.load(turf::Relaxed);
198 sreg oldValuesInUse = oldValuesLimit - oldValuesRemaining;
199 calculateNextTableSize:
200 sreg nextTableSize = turf::util::roundUpPowerOf2(oldValuesInUse * 2);
201 sreg nextLimitNumValues = nextTableSize * 3 / 4;
202 if (nextLimitNumValues < oldValuesLimit) {
203 // Set the new limitNumValues on the *current* table.
204 // This prevents other threads, while the migration is in progress, from concurrently
205 // re-inserting more values than the new table can hold.
206 // To set the new limitNumValues on the current table in an atomic fashion,
207 // we update its valuesRemaining via CAS loop:
209 // We must recalculate desiredValuesRemaining on each iteration of the CAS loop
210 oldValuesInUse = oldValuesLimit - oldValuesRemaining;
211 sreg desiredValuesRemaining = nextLimitNumValues - oldValuesInUse;
212 if (desiredValuesRemaining < 0) {
213 TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues, desiredValuesRemaining);
214 // Must recalculate nextTableSize. Goto, baby!
215 goto calculateNextTableSize;
217 if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed, turf::Relaxed))
219 // CAS failed because table->valuesRemaining was modified by another thread.
220 // An updated value has been reloaded into oldValuesRemaining (modified by reference).
221 // Recalculate desiredValuesRemaining to account for the updated value, and try again.
222 TURF_TRACE(Linear, 12, "[table] valuesRemaining CAS failed", oldValuesRemaining, desiredValuesRemaining);
225 // Now we are assured that the new table will not become overpopulated during the migration process.
226 // Create new migration.
227 TableMigration* migration = new TableMigration(map);
228 migration->m_source = table;
229 migration->m_destination = Table::create(nextTableSize, nextLimitNumValues);
230 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
231 // Publish the new migration.
232 table->jobCoordinator.storeRelease(migration);
239 bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
240 ureg srcSizeMask = m_source->sizeMask;
241 ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
242 sreg valuesMigrated = 0;
243 // Iterate over source range.
244 for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
245 Cell* srcCell = m_source->getCells() + (srcIdx & srcSizeMask);
248 // Fetch the srcHash and srcValue.
250 srcHash = srcCell->hash.load(turf::Relaxed);
251 if (srcHash == KeyTraits::NullHash) {
252 // An unused cell. Try to put a Redirect marker in its value.
253 srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
254 if (srcValue == Value(ValueTraits::Redirect)) {
255 // srcValue is already marked Redirect due to previous incomplete migration.
256 TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(m_source), srcIdx);
259 if (srcValue == Value(ValueTraits::NullValue))
260 break; // Redirect has been placed. Break inner loop, continue outer loop.
261 TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(m_source), srcIdx);
262 // Otherwise, somebody just claimed the cell. Read srcHash again...
264 // Check for deleted/uninitialized value.
265 srcValue = srcCell->value.load(turf::Relaxed);
266 if (srcValue == Value(ValueTraits::NullValue)) {
267 // Try to put a Redirect marker.
268 if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
269 break; // Redirect has been placed. Break inner loop, continue outer loop.
270 TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(m_source), srcIdx);
273 // We've got a key/value pair to migrate.
274 // Reserve a destination cell in the destination.
275 TURF_ASSERT(srcHash != KeyTraits::NullHash);
276 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
277 TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible.
279 InsertResult result = insert(srcHash, m_destination, dstCell);
280 // During migration, a hash can only exist in one place among all the source tables,
281 // and it is only migrated by one thread. Therefore, the hash will never already exist
282 // in the destination table:
283 TURF_ASSERT(result != InsertResult_AlreadyFound);
284 TURF_ASSERT(result != InsertResult_Overflow);
285 // Migrate the old value to the new cell.
287 // Copy srcValue to the destination.
288 dstCell->value.store(srcValue, turf::Relaxed);
289 // Try to place a Redirect marker in srcValue.
290 Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
291 TURF_ASSERT(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
292 if (doubleCheckedSrcValue == srcValue) {
293 // No racing writes to the src. We've successfully placed the Redirect marker.
294 // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
295 // by a late-arriving erase.
296 if (srcValue == Value(ValueTraits::NullValue))
297 TURF_TRACE(Linear, 16, "[migrateRange] racing update was erase", uptr(m_source), srcIdx);
302 // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
303 TURF_TRACE(Linear, 17, "[migrateRange] race to update migrated value", uptr(m_source), srcIdx);
304 srcValue = doubleCheckedSrcValue;
306 // Cell successfully migrated. Proceed to next source cell.
311 sreg prevValuesRemaining = m_destination->valuesRemaining.fetchSub(valuesMigrated, turf::Relaxed);
312 TURF_ASSERT(valuesMigrated <= prevValuesRemaining);
313 TURF_UNUSED(prevValuesRemaining);
314 // Range has been migrated successfully.
319 void Linear<Map>::TableMigration::run() {
320 // Conditionally increment the shared # of workers.
321 ureg probeStatus = m_workerStatus.load(turf::Relaxed);
323 if (probeStatus & 1) {
324 // End flag is already set, so do nothing.
325 TURF_TRACE(Linear, 18, "[TableMigration::run] already ended", uptr(this), 0);
328 } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
329 // # of workers has been incremented, and the end flag is clear.
330 TURF_ASSERT((probeStatus & 1) == 0);
332 // Loop over all migration units in the source table.
334 if (m_workerStatus.load(turf::Relaxed) & 1) {
335 TURF_TRACE(Linear, 19, "[TableMigration::run] detected end flag set", uptr(this), 0);
338 ureg startIdx = m_sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
339 if (startIdx >= m_source->sizeMask + 1)
340 break; // No more migration units.
341 migrateRange(startIdx);
342 sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
343 TURF_ASSERT(prevRemaining > 0);
344 if (prevRemaining == 1) {
345 // That was the last chunk to migrate.
346 m_workerStatus.fetchOr(1, turf::Relaxed);
350 TURF_TRACE(Linear, 20, "[TableMigration::run] out of migration units", uptr(this), 0);
353 // Decrement the shared # of workers.
354 probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
355 if (probeStatus >= 4) {
356 // There are other workers remaining. Return here so that only the very last worker will proceed.
357 TURF_TRACE(Linear, 21, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
361 // We're the very last worker thread.
362 // Publish the new subtree.
363 TURF_ASSERT(probeStatus == 3);
364 m_map.publishTableMigration(this);
365 // End the jobCoodinator.
366 m_source->jobCoordinator.end();
368 // We're done with this TableMigration. Queue it for GC.
369 DefaultQSBR.enqueue(&TableMigration::destroy, this);
372 } // namespace details
373 } // namespace junction
375 #endif // JUNCTION_DETAILS_LINEAR_H