1 /*------------------------------------------------------------------------
2 Junction: Concurrent data structures in C++
3 Copyright (c) 2016 Jeff Preshing
5 Distributed under the Simplified BSD License.
6 Original location: https://github.com/preshing/junction
8 This software is distributed WITHOUT ANY WARRANTY; without even the
9 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10 See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
13 #ifndef JUNCTION_DETAILS_LINEAR_H
14 #define JUNCTION_DETAILS_LINEAR_H
16 #include <junction/Core.h>
17 #include <turf/Atomic.h>
18 #include <turf/Mutex.h>
19 #include <turf/ManualResetEvent.h>
20 #include <turf/Util.h>
21 #include <junction/MapTraits.h>
22 #include <turf/Trace.h>
23 #include <turf/Heap.h>
24 #include <junction/SimpleJobCoordinator.h>
25 #include <junction/QSBR.h>
27 // Enable this to force migration overflows (for test purposes):
28 #define JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS 0
33 TURF_TRACE_DECLARE(Linear, 26)
37 typedef typename Map::Hash Hash;
38 typedef typename Map::Value Value;
39 typedef typename Map::KeyTraits KeyTraits;
40 typedef typename Map::ValueTraits ValueTraits;
42 static const ureg InitialSize = 8;
43 static const ureg TableMigrationUnitSize = 32;
44 static const ureg CellsInUseSample = 256;
47 turf::Atomic<Hash> hash;
48 turf::Atomic<Value> value;
52 const ureg sizeMask; // a power of two minus one
53 turf::Atomic<sreg> cellsRemaining;
54 turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
55 SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
57 Table(ureg sizeMask) : sizeMask(sizeMask), cellsRemaining(sreg(sizeMask * 0.75f)) {
60 static Table* create(ureg tableSize) {
61 TURF_ASSERT(turf::util::isPowerOf2(tableSize));
62 Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
63 new (table) Table(tableSize - 1);
64 for (ureg j = 0; j < tableSize; j++) {
65 table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
66 table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
72 this->Table::~Table();
76 Cell* getCells() const {
77 return (Cell*) (this + 1);
80 ureg getNumMigrationUnits() const {
81 return sizeMask / TableMigrationUnitSize + 1;
85 class TableMigration : public SimpleJobCoordinator::Job {
89 turf::Atomic<ureg> sourceIndex;
94 turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
95 turf::Atomic<bool> m_overflowed;
96 turf::Atomic<sreg> m_unitsRemaining;
99 TableMigration(Map& map) : m_map(map) {
102 static TableMigration* create(Map& map, ureg numSources) {
103 TableMigration* migration =
104 (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
105 new (migration) TableMigration(map);
106 migration->m_workerStatus.storeNonatomic(0);
107 migration->m_overflowed.storeNonatomic(false);
108 migration->m_unitsRemaining.storeNonatomic(0);
109 migration->m_numSources = numSources;
110 // Caller is responsible for filling in sources & destination
114 virtual ~TableMigration() TURF_OVERRIDE {
118 // Destroy all source tables.
119 for (ureg i = 0; i < m_numSources; i++)
120 if (getSources()[i].table)
121 getSources()[i].table->destroy();
122 // Delete the migration object itself.
123 this->TableMigration::~TableMigration();
124 TURF_HEAP.free(this);
127 Source* getSources() const {
128 return (Source*) (this + 1);
131 bool migrateRange(Table* srcTable, ureg startIdx);
132 virtual void run() TURF_OVERRIDE;
135 static Cell* find(Hash hash, Table* table) {
136 TURF_TRACE(Linear, 0, "[find] called", uptr(table), hash);
138 TURF_ASSERT(hash != KeyTraits::NullHash);
139 ureg sizeMask = table->sizeMask;
140 for (ureg idx = ureg(hash);; idx++) {
142 Cell* cell = table->getCells() + idx;
143 // Load the hash that was there.
144 Hash probeHash = cell->hash.load(turf::Relaxed);
145 if (probeHash == hash) {
146 TURF_TRACE(Linear, 1, "[find] found existing cell", uptr(table), idx);
148 } else if (probeHash == KeyTraits::NullHash) {
154 // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
155 enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
156 static InsertResult insert(Hash hash, Table* table, Cell*& cell) {
157 TURF_TRACE(Linear, 2, "[insert] called", uptr(table), hash);
159 TURF_ASSERT(hash != KeyTraits::NullHash);
160 ureg sizeMask = table->sizeMask;
162 for (ureg idx = ureg(hash);; idx++) {
164 cell = table->getCells() + idx;
165 // Load the existing hash.
166 Hash probeHash = cell->hash.load(turf::Relaxed);
167 if (probeHash == hash) {
168 TURF_TRACE(Linear, 3, "[insert] found existing cell", uptr(table), idx);
169 return InsertResult_AlreadyFound; // Key found in table. Return the existing cell.
171 if (probeHash == KeyTraits::NullHash) {
172 // It's an empty cell. Try to reserve it.
173 // But first, decrement cellsRemaining to ensure we have permission to create new cells.
174 s32 prevCellsRemaining = table->cellsRemaining.fetchSub(1, turf::Relaxed);
175 if (prevCellsRemaining <= 0) {
176 // Table is overpopulated.
177 TURF_TRACE(Linear, 4, "[insert] ran out of cellsRemaining", prevCellsRemaining, 0);
178 table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
179 return InsertResult_Overflow;
181 // Try to reserve this cell.
182 Hash prevHash = cell->hash.compareExchange(KeyTraits::NullHash, hash, turf::Relaxed);
183 if (prevHash == KeyTraits::NullHash) {
184 // Success. We reserved a new cell.
185 TURF_TRACE(Linear, 5, "[insert] reserved cell", prevCellsRemaining, idx);
186 return InsertResult_InsertedNew;
188 // There was a race and another thread reserved that cell from under us.
189 TURF_TRACE(Linear, 6, "[insert] detected race to reserve cell", ureg(hash), idx);
190 table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
191 if (prevHash == hash) {
192 TURF_TRACE(Linear, 7, "[insert] race reserved same hash", ureg(hash), idx);
193 return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell.
196 // Try again in the next cell.
200 static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
201 // Create new migration by DCLI.
202 TURF_TRACE(Linear, 8, "[beginTableMigrationToSize] called", 0, 0);
203 SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
205 TURF_TRACE(Linear, 9, "[beginTableMigrationToSize] new migration already exists", 0, 0);
207 turf::LockGuard<turf::Mutex> guard(table->mutex);
208 job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
210 TURF_TRACE(Linear, 10, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
212 // Create new migration.
213 TableMigration* migration = TableMigration::create(map, 1);
214 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
215 migration->getSources()[0].table = table;
216 migration->getSources()[0].sourceIndex.storeNonatomic(0);
217 migration->m_destination = Table::create(nextTableSize);
218 // Publish the new migration.
219 table->jobCoordinator.storeRelease(migration);
224 static void beginTableMigration(Map& map, Table* table) {
225 // Estimate number of cells in use based on a small sample.
227 ureg sampleSize = turf::util::min<ureg>(table->sizeMask + 1, CellsInUseSample);
229 for (; idx < sampleSize; idx++) {
230 Cell* cell = table->getCells() + idx;
231 Value value = cell->value.load(turf::Relaxed);
232 if (value == Value(ValueTraits::Redirect)) {
233 // Another thread kicked off the jobCoordinator. The caller will participate upon return.
234 TURF_TRACE(Linear, 11, "[beginTableMigration] redirected while determining table size", 0, 0);
237 if (value != Value(ValueTraits::NullValue))
240 float inUseRatio = float(inUseCells) / sampleSize;
241 float estimatedInUse = (table->sizeMask + 1) * inUseRatio;
242 #if JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS
243 // Periodically underestimate the number of cells in use.
244 // This exercises the code that handles overflow during migration.
245 static ureg counter = 1;
246 if ((++counter & 3) == 0) {
250 ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
251 beginTableMigrationToSize(map, table, nextTableSize);
256 bool Linear<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
257 ureg srcSizeMask = srcTable->sizeMask;
258 ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
259 sreg valuesMigrated = 0;
260 // Iterate over source range.
261 for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
262 Cell* srcCell = srcTable->getCells() + (srcIdx & srcSizeMask);
265 // Fetch the srcHash and srcValue.
267 srcHash = srcCell->hash.load(turf::Relaxed);
268 if (srcHash == KeyTraits::NullHash) {
269 // An unused cell. Try to put a Redirect marker in its value.
271 srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
272 if (srcValue == Value(ValueTraits::Redirect)) {
273 // srcValue is already marked Redirect due to previous incomplete migration.
274 TURF_TRACE(Linear, 12, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
277 if (srcValue == Value(ValueTraits::NullValue))
278 break; // Redirect has been placed. Break inner loop, continue outer loop.
279 TURF_TRACE(Linear, 13, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
280 // Otherwise, somebody just claimed the cell. Read srcHash again...
282 // Check for deleted/uninitialized value.
283 srcValue = srcCell->value.load(turf::Relaxed);
284 if (srcValue == Value(ValueTraits::NullValue)) {
285 // Try to put a Redirect marker.
286 if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
287 break; // Redirect has been placed. Break inner loop, continue outer loop.
288 TURF_TRACE(Linear, 14, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
289 if (srcValue == Value(ValueTraits::Redirect)) {
290 // FIXME: I don't think this will happen. Investigate & change to assert
291 TURF_TRACE(Linear, 15, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
294 } else if (srcValue == Value(ValueTraits::Redirect)) {
295 // srcValue is already marked Redirect due to previous incomplete migration.
296 TURF_TRACE(Linear, 16, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
300 // We've got a key/value pair to migrate.
301 // Reserve a destination cell in the destination.
302 TURF_ASSERT(srcHash != KeyTraits::NullHash);
303 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
304 TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
306 InsertResult result = insert(srcHash, m_destination, dstCell);
307 // During migration, a hash can only exist in one place among all the source tables,
308 // and it is only migrated by one thread. Therefore, the hash will never already exist
309 // in the destination table:
310 TURF_ASSERT(result != InsertResult_AlreadyFound);
311 if (result == InsertResult_Overflow) {
312 // Destination overflow.
313 // This can happen for several reasons. For example, the source table could have
314 // existed of all deleted cells when it overflowed, resulting in a small destination
315 // table size, but then another thread could re-insert all the same hashes
316 // before the migration completed.
317 // Caller will cancel the current migration and begin a new one.
320 // Migrate the old value to the new cell.
322 // Copy srcValue to the destination.
323 dstCell->value.store(srcValue, turf::Relaxed);
324 // Try to place a Redirect marker in srcValue.
325 Value doubleCheckedSrcValue =
326 srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
327 TURF_ASSERT(doubleCheckedSrcValue !=
328 Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
329 if (doubleCheckedSrcValue == srcValue) {
330 // No racing writes to the src. We've successfully placed the Redirect marker.
331 // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
332 // by a late-arriving erase.
333 if (srcValue == Value(ValueTraits::NullValue))
334 TURF_TRACE(Linear, 17, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
337 // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
338 TURF_TRACE(Linear, 18, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
339 srcValue = doubleCheckedSrcValue;
341 // Cell successfully migrated. Proceed to next source cell.
346 // Range has been migrated successfully.
351 void Linear<Map>::TableMigration::run() {
352 // Conditionally increment the shared # of workers.
353 ureg probeStatus = m_workerStatus.load(turf::Relaxed);
355 if (probeStatus & 1) {
356 // End flag is already set, so do nothing.
357 TURF_TRACE(Linear, 19, "[TableMigration::run] already ended", uptr(this), 0);
360 } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
361 // # of workers has been incremented, and the end flag is clear.
362 TURF_ASSERT((probeStatus & 1) == 0);
364 // Iterate over all source tables.
365 for (ureg s = 0; s < m_numSources; s++) {
366 Source& source = getSources()[s];
367 // Loop over all migration units in this source table.
369 if (m_workerStatus.load(turf::Relaxed) & 1) {
370 TURF_TRACE(Linear, 20, "[TableMigration::run] detected end flag set", uptr(this), 0);
373 ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
374 if (startIdx >= source.table->sizeMask + 1)
375 break; // No more migration units in this table. Try next source table.
376 bool overflowed = !migrateRange(source.table, startIdx);
378 // *** FAILED MIGRATION ***
379 // TableMigration failed due to destination table overflow.
380 // No other thread can declare the migration successful at this point, because *this* unit will never complete,
381 // hence m_unitsRemaining won't reach zero.
382 // However, multiple threads can independently detect a failed migration at the same time.
383 TURF_TRACE(Linear, 21, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
384 // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
385 // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
387 // that deals with it.
388 bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
390 TURF_TRACE(Linear, 22, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
391 uptr(oldOverflowed));
392 m_workerStatus.fetchOr(1, turf::Relaxed);
395 sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
396 TURF_ASSERT(prevRemaining > 0);
397 if (prevRemaining == 1) {
398 // *** SUCCESSFUL MIGRATION ***
399 // That was the last chunk to migrate.
400 m_workerStatus.fetchOr(1, turf::Relaxed);
405 TURF_TRACE(Linear, 23, "[TableMigration::run] out of migration units", uptr(this), 0);
408 // Decrement the shared # of workers.
409 probeStatus = m_workerStatus.fetchSub(
410 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
411 if (probeStatus >= 4) {
412 // There are other workers remaining. Return here so that only the very last worker will proceed.
413 TURF_TRACE(Linear, 24, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
417 // We're the very last worker thread.
418 // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
419 TURF_ASSERT(probeStatus == 3);
420 bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
422 // The migration succeeded. This is the most likely outcome. Publish the new subtree.
423 m_map.publishTableMigration(this);
424 // End the jobCoodinator.
425 getSources()[0].table->jobCoordinator.end();
427 // The migration failed due to the overflow of the destination table.
428 Table* origTable = getSources()[0].table;
429 turf::LockGuard<turf::Mutex> guard(origTable->mutex);
430 SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
431 if (checkedJob != this) {
432 TURF_TRACE(Linear, 25, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
435 TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
436 // Double the destination table size.
437 migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
438 // Transfer source tables to the new migration.
439 for (ureg i = 0; i < m_numSources; i++) {
440 migration->getSources()[i].table = getSources()[i].table;
441 getSources()[i].table = NULL;
442 migration->getSources()[i].sourceIndex.storeNonatomic(0);
444 migration->getSources()[m_numSources].table = m_destination;
445 migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
446 // Calculate total number of migration units to move.
447 ureg unitsRemaining = 0;
448 for (ureg s = 0; s < migration->m_numSources; s++)
449 unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
450 migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
451 // Publish the new migration.
452 origTable->jobCoordinator.storeRelease(migration);
456 // We're done with this TableMigration. Queue it for GC.
457 DefaultQSBR.enqueue(&TableMigration::destroy, this);
460 } // namespace details
461 } // namespace junction
463 #endif // JUNCTION_DETAILS_LINEAR_H