From: Jeff Preshing Date: Thu, 11 Feb 2016 17:27:24 +0000 (-0500) Subject: Improve ConcurrentMap_Linear scalability X-Git-Url: http://plrg.eecs.uci.edu/git/?p=junction.git;a=commitdiff_plain;h=c7fafdd7d1e574307901bb196685acf787ca23e8 Improve ConcurrentMap_Linear scalability Removed shared variable Table::valuesRemaining. Instead, detect and handle overflows during migration, like ConcurrentMap_LeapFrog. Added #defines to force migration overflows during testing: JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS 6 cores runs ~57% faster on Win32. --- diff --git a/junction/ConcurrentMap_LeapFrog.h b/junction/ConcurrentMap_LeapFrog.h index a9dc4a3..389ce40 100644 --- a/junction/ConcurrentMap_LeapFrog.h +++ b/junction/ConcurrentMap_LeapFrog.h @@ -251,7 +251,7 @@ public: // Lookup without creating a temporary Mutator. Value get(Key key) { Hash hash = KeyTraits::hash(key); - TURF_TRACE(ConcurrentMap_LeapFrog, 15, "[get] called", uptr(hash), 0); + TURF_TRACE(ConcurrentMap_LeapFrog, 15, "[get] called", uptr(this), uptr(hash)); for (;;) { typename Details::Table* table = m_root.load(turf::Consume); typename Details::Cell* cell = Details::find(hash, table); diff --git a/junction/ConcurrentMap_Linear.cpp b/junction/ConcurrentMap_Linear.cpp index 41fbb1e..90819e2 100644 --- a/junction/ConcurrentMap_Linear.cpp +++ b/junction/ConcurrentMap_Linear.cpp @@ -14,13 +14,12 @@ namespace junction { -TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_Linear, 18) // autogenerated by TidySource.py +TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_Linear, 17) // autogenerated by TidySource.py TURF_TRACE_DEFINE("[Mutator] find constructor called") TURF_TRACE_DEFINE("[Mutator] find was redirected") TURF_TRACE_DEFINE("[Mutator] insert constructor called") TURF_TRACE_DEFINE("[Mutator] insert was redirected") TURF_TRACE_DEFINE("[Mutator::exchangeValue] called") -TURF_TRACE_DEFINE("[Mutator::exchangeValue] ran out of valuesRemaining") TURF_TRACE_DEFINE("[Mutator::exchangeValue] exchanged Value") TURF_TRACE_DEFINE("[Mutator::exchangeValue] detected race to write value") TURF_TRACE_DEFINE("[Mutator::exchangeValue] racing write inserted new value") @@ -33,6 +32,6 @@ TURF_TRACE_DEFINE("[Mutator::eraseValue] was redirected") TURF_TRACE_DEFINE("[Mutator::eraseValue] was re-redirected") TURF_TRACE_DEFINE("[get] called") TURF_TRACE_DEFINE("[get] was redirected") -TURF_TRACE_DEFINE_END(ConcurrentMap_Linear, 18) +TURF_TRACE_DEFINE_END(ConcurrentMap_Linear, 17) } // namespace junction diff --git a/junction/ConcurrentMap_Linear.h b/junction/ConcurrentMap_Linear.h index 4cf349d..bf6c1ea 100644 --- a/junction/ConcurrentMap_Linear.h +++ b/junction/ConcurrentMap_Linear.h @@ -21,7 +21,7 @@ namespace junction { -TURF_TRACE_DECLARE(ConcurrentMap_Linear, 18) +TURF_TRACE_DECLARE(ConcurrentMap_Linear, 17) template , class VT = DefaultValueTraits > class ConcurrentMap_Linear { @@ -37,9 +37,7 @@ private: turf::Atomic m_root; public: - ConcurrentMap_Linear(ureg capacity = Details::InitialSize) { - ureg limitNumValues = capacity * 3 / 4; - m_root.storeNonatomic(Details::Table::create(capacity, limitNumValues)); + ConcurrentMap_Linear(ureg capacity = Details::InitialSize) : m_root(Details::Table::create(capacity)) { } ~ConcurrentMap_Linear() { @@ -53,7 +51,7 @@ public: // There are no racing calls to this function. typename Details::Table* oldRoot = m_root.loadNonatomic(); m_root.store(migration->m_destination, turf::Release); - TURF_ASSERT(oldRoot == migration->m_source); + TURF_ASSERT(oldRoot == migration->getSources()[0].table); // Caller will GC the TableMigration and the source table. } @@ -87,7 +85,7 @@ public: if (m_value != Value(ValueTraits::Redirect)) return; // Found an existing value // We've encountered a Redirect value. Help finish the migration. - TURF_TRACE(ConcurrentMap_Linear, 1, "[Mutator] find was redirected", uptr(m_table), uptr(0)); + TURF_TRACE(ConcurrentMap_Linear, 1, "[Mutator] find was redirected", uptr(m_table), 0); m_table->jobCoordinator.participate(); // Try again using the latest root. } @@ -138,52 +136,27 @@ public: TURF_TRACE(ConcurrentMap_Linear, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value)); for (;;) { Value oldValue = m_value; - s32 prevRemainingValues = s32(-1); - if (oldValue == Value(ValueTraits::NullValue)) { - // It's a deleted or newly initialized cell. - // Decrement remainingValues to ensure we have permission to (re)insert a value. - prevRemainingValues = m_table->valuesRemaining.fetchSub(1, turf::Relaxed); - if (prevRemainingValues <= 0) { - TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] ran out of valuesRemaining", uptr(m_table), - prevRemainingValues); - // Can't (re)insert any more values. - // There are two ways this can happen. One with a TableMigration already in progress, and one without: - // 1. A TableMigration puts a cap on the number of values late-arriving threads are allowed to insert. - // 2. Two threads race to insert the same key, and it's the last free cell in the table. - // (Note: We could get tid of the valuesRemaining counter by handling the possibility of migration - // failure, - // as LeapFrog and Grampa do...) - m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement - // Since we don't know whether there's already a TableMigration in progress, always attempt to start one - // here: - Details::beginTableMigration(m_map, m_table); - goto helpMigrate; - } - } if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) { // Exchange was successful. Return previous value. - TURF_TRACE(ConcurrentMap_Linear, 6, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired)); + TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired)); Value result = m_value; m_value = desired; // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. if (m_value != Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), + TURF_TRACE(ConcurrentMap_Linear, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), uptr(m_value)); if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] racing write inserted new value", + TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] racing write inserted new value", uptr(m_table), uptr(m_value)); - m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement } // There was a racing write (or erase) to this cell. // Pretend we exchanged with ourselves, and just let the racing write win. return desired; } // We've encountered a Redirect value. Help finish the migration. - TURF_TRACE(ConcurrentMap_Linear, 9, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value)); - helpMigrate: - // Help migrate to new table. + TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value)); Hash hash = m_cell->hash.load(turf::Relaxed); for (;;) { // Help complete the migration. @@ -195,7 +168,7 @@ public: case Details::InsertResult_AlreadyFound: m_value = m_cell->value.load(turf::Consume); if (m_value == Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), + TURF_TRACE(ConcurrentMap_Linear, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), uptr(m_value)); break; } @@ -203,8 +176,7 @@ public: case Details::InsertResult_InsertedNew: goto breakOuter; case Details::InsertResult_Overflow: - TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), - 0); + TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), 0); Details::beginTableMigration(m_map, m_table); break; } @@ -221,7 +193,7 @@ public: Value eraseValue() { TURF_ASSERT(m_cell); // Cell must have been found or inserted - TURF_TRACE(ConcurrentMap_Linear, 12, "[Mutator::eraseValue] called", uptr(m_table), m_cell - m_table->getCells()); + TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::eraseValue] called", uptr(m_table), m_cell - m_table->getCells()); for (;;) { if (m_value == Value(ValueTraits::NullValue)) return Value(m_value); @@ -229,13 +201,12 @@ public: if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) { // Exchange was successful and a non-NULL value was erased and returned by reference in m_value. TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop. - m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); Value result = m_value; m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. - TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] detected race to write value", uptr(m_table), + TURF_TRACE(ConcurrentMap_Linear, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table), m_cell - m_table->getCells()); if (m_value != Value(ValueTraits::Redirect)) { // There was a racing write (or erase) to this cell. @@ -243,7 +214,7 @@ public: return Value(ValueTraits::NullValue); } // We've been redirected to a new table. - TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was redirected", uptr(m_table), + TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] was redirected", uptr(m_table), m_cell - m_table->getCells()); Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash for (;;) { @@ -259,7 +230,7 @@ public: m_value = m_cell->value.load(turf::Relaxed); if (m_value != Value(ValueTraits::Redirect)) break; - TURF_TRACE(ConcurrentMap_Linear, 15, "[Mutator::eraseValue] was re-redirected", uptr(m_table), + TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table), m_cell - m_table->getCells()); } } @@ -277,7 +248,7 @@ public: // Lookup without creating a temporary Mutator. Value get(Key key) { Hash hash = KeyTraits::hash(key); - TURF_TRACE(ConcurrentMap_Linear, 16, "[get] called", uptr(this), uptr(hash)); + TURF_TRACE(ConcurrentMap_Linear, 15, "[get] called", uptr(this), uptr(hash)); for (;;) { typename Details::Table* table = m_root.load(turf::Consume); typename Details::Cell* cell = Details::find(hash, table); @@ -287,7 +258,7 @@ public: if (value != Value(ValueTraits::Redirect)) return value; // Found an existing value // We've been redirected to a new table. Help with the migration. - TURF_TRACE(ConcurrentMap_Linear, 17, "[get] was redirected", uptr(table), uptr(cell)); + TURF_TRACE(ConcurrentMap_Linear, 16, "[get] was redirected", uptr(table), uptr(cell)); table->jobCoordinator.participate(); // Try again in the new table. } diff --git a/junction/details/Grampa.h b/junction/details/Grampa.h index 9f85df2..2fe2cc2 100644 --- a/junction/details/Grampa.h +++ b/junction/details/Grampa.h @@ -696,10 +696,8 @@ void Grampa::TableMigration::run() { // However, multiple threads can independently detect a failed migration at the same time. TURF_TRACE(Grampa, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); // The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads - // before - // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from - // the thread - // that deals with it. + // before we can safely deal with the overflow. Therefore, the thread that detects the failure is often + // different from the thread that deals with it. // Store overflowTableIndex unconditionally; racing writes should be rare, and it doesn't matter which one wins. sreg oldIndex = m_overflowTableIndex.exchange(overflowTableIndex, turf::Relaxed); if (oldIndex >= 0) diff --git a/junction/details/LeapFrog.h b/junction/details/LeapFrog.h index 800d737..75d91fb 100644 --- a/junction/details/LeapFrog.h +++ b/junction/details/LeapFrog.h @@ -24,6 +24,9 @@ #include #include +// Enable this to force migration overflows (for test purposes): +#define JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS 0 + namespace junction { namespace details { @@ -349,7 +352,15 @@ struct LeapFrog { } float inUseRatio = float(inUseCells) / CellsInUseSample; float estimatedInUse = (sizeMask + 1) * inUseRatio; - ureg nextTableSize = turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)); +#if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS + // Periodically underestimate the number of cells in use. + // This exercises the code that handles overflow during migration. + static ureg counter = 1; + if ((++counter & 3) == 0) { + estimatedInUse /= 4; + } +#endif + ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2))); beginTableMigrationToSize(map, table, nextTableSize); } }; // LeapFrog diff --git a/junction/details/Linear.cpp b/junction/details/Linear.cpp index d731567..299f654 100644 --- a/junction/details/Linear.cpp +++ b/junction/details/Linear.cpp @@ -17,7 +17,7 @@ namespace junction { namespace details { -TURF_TRACE_DEFINE_BEGIN(Linear, 22) // autogenerated by TidySource.py +TURF_TRACE_DEFINE_BEGIN(Linear, 26) // autogenerated by TidySource.py TURF_TRACE_DEFINE("[find] called") TURF_TRACE_DEFINE("[find] found existing cell") TURF_TRACE_DEFINE("[insert] called") @@ -26,21 +26,25 @@ TURF_TRACE_DEFINE("[insert] ran out of cellsRemaining") TURF_TRACE_DEFINE("[insert] reserved cell") TURF_TRACE_DEFINE("[insert] detected race to reserve cell") TURF_TRACE_DEFINE("[insert] race reserved same hash") -TURF_TRACE_DEFINE("[beginTableMigration] called") -TURF_TRACE_DEFINE("[beginTableMigration] new migration already exists") -TURF_TRACE_DEFINE("[beginTableMigration] new migration already exists (double-checked)") -TURF_TRACE_DEFINE("[table] restarting valuesRemaining CAS loop") -TURF_TRACE_DEFINE("[table] valuesRemaining CAS failed") +TURF_TRACE_DEFINE("[beginTableMigrationToSize] called") +TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists") +TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)") +TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size") TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected") TURF_TRACE_DEFINE("[migrateRange] race to insert key") TURF_TRACE_DEFINE("[migrateRange] race to insert value") +TURF_TRACE_DEFINE("[migrateRange] race inserted Redirect") +TURF_TRACE_DEFINE("[migrateRange] in-use cell already redirected") TURF_TRACE_DEFINE("[migrateRange] racing update was erase") TURF_TRACE_DEFINE("[migrateRange] race to update migrated value") TURF_TRACE_DEFINE("[TableMigration::run] already ended") TURF_TRACE_DEFINE("[TableMigration::run] detected end flag set") +TURF_TRACE_DEFINE("[TableMigration::run] destination overflow") +TURF_TRACE_DEFINE("[TableMigration::run] race to set m_overflowed") TURF_TRACE_DEFINE("[TableMigration::run] out of migration units") TURF_TRACE_DEFINE("[TableMigration::run] not the last worker") -TURF_TRACE_DEFINE_END(Linear, 22) +TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started") +TURF_TRACE_DEFINE_END(Linear, 26) } // namespace details } // namespace junction diff --git a/junction/details/Linear.h b/junction/details/Linear.h index 991f853..40fc450 100644 --- a/junction/details/Linear.h +++ b/junction/details/Linear.h @@ -24,10 +24,13 @@ #include #include +// Enable this to force migration overflows (for test purposes): +#define JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS 0 + namespace junction { namespace details { -TURF_TRACE_DECLARE(Linear, 22) +TURF_TRACE_DECLARE(Linear, 26) template struct Linear { @@ -38,10 +41,7 @@ struct Linear { static const ureg InitialSize = 8; static const ureg TableMigrationUnitSize = 32; - static const ureg LinearSearchLimit = 128; - static const ureg CellsInUseSample = LinearSearchLimit; - TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links - TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain + static const ureg CellsInUseSample = 256; struct Cell { turf::Atomic hash; @@ -50,21 +50,17 @@ struct Linear { struct Table { const ureg sizeMask; // a power of two minus one - const ureg limitNumValues; turf::Atomic cellsRemaining; - turf::Atomic valuesRemaining; turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration - Table(ureg sizeMask, ureg limitNumValues) - : sizeMask(sizeMask), limitNumValues(limitNumValues), cellsRemaining(limitNumValues), - valuesRemaining(limitNumValues) { + Table(ureg sizeMask) : sizeMask(sizeMask), cellsRemaining(sreg(sizeMask * 0.75f)) { } - static Table* create(ureg tableSize, ureg limitNumValues) { + static Table* create(ureg tableSize) { TURF_ASSERT(turf::util::isPowerOf2(tableSize)); Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize); - new (table) Table(tableSize - 1, limitNumValues); + new (table) Table(tableSize - 1); for (ureg j = 0; j < tableSize; j++) { table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash); table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue)); @@ -88,27 +84,51 @@ struct Linear { class TableMigration : public SimpleJobCoordinator::Job { public: + struct Source { + Table* table; + turf::Atomic sourceIndex; + }; + Map& m_map; - Table* m_source; - turf::Atomic m_sourceIndex; Table* m_destination; turf::Atomic m_workerStatus; // number of workers + end flag + turf::Atomic m_overflowed; turf::Atomic m_unitsRemaining; + ureg m_numSources; + + TableMigration(Map& map) : m_map(map) { + } - TableMigration(Map& map) : m_map(map), m_sourceIndex(0), m_workerStatus(0), m_unitsRemaining(0) { - // Caller is responsible for filling in source & destination + static TableMigration* create(Map& map, ureg numSources) { + TableMigration* migration = + (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources); + new (migration) TableMigration(map); + migration->m_workerStatus.storeNonatomic(0); + migration->m_overflowed.storeNonatomic(false); + migration->m_unitsRemaining.storeNonatomic(0); + migration->m_numSources = numSources; + // Caller is responsible for filling in sources & destination + return migration; } virtual ~TableMigration() TURF_OVERRIDE { - // Destroy source table. - m_source->destroy(); } void destroy() { - delete this; + // Destroy all source tables. + for (ureg i = 0; i < m_numSources; i++) + if (getSources()[i].table) + getSources()[i].table->destroy(); + // Delete the migration object itself. + this->TableMigration::~TableMigration(); + TURF_HEAP.free(this); + } + + Source* getSources() const { + return (Source*) (this + 1); } - bool migrateRange(ureg startIdx); + bool migrateRange(Table* srcTable, ureg startIdx); virtual void run() TURF_OVERRIDE; }; @@ -150,7 +170,7 @@ struct Linear { } if (probeHash == KeyTraits::NullHash) { // It's an empty cell. Try to reserve it. - // But first, decrement cellsRemaining to ensure we have permission to create new getCells(). + // But first, decrement cellsRemaining to ensure we have permission to create new cells. s32 prevCellsRemaining = table->cellsRemaining.fetchSub(1, turf::Relaxed); if (prevCellsRemaining <= 0) { // Table is overpopulated. @@ -177,71 +197,69 @@ struct Linear { } } - static void beginTableMigration(Map& map, Table* table) { + static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) { // Create new migration by DCLI. - TURF_TRACE(Linear, 8, "[beginTableMigration] called", 0, 0); + TURF_TRACE(Linear, 8, "[beginTableMigrationToSize] called", 0, 0); SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume(); if (job) { - TURF_TRACE(Linear, 9, "[beginTableMigration] new migration already exists", 0, 0); + TURF_TRACE(Linear, 9, "[beginTableMigrationToSize] new migration already exists", 0, 0); } else { turf::LockGuard guard(table->mutex); job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. if (job) { - TURF_TRACE(Linear, 10, "[beginTableMigration] new migration already exists (double-checked)", 0, 0); + TURF_TRACE(Linear, 10, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0); } else { - // Determine new migration size and cap the number of values that can be added concurrent to the migration. - sreg oldValuesLimit = table->limitNumValues; - sreg oldValuesRemaining = table->valuesRemaining.load(turf::Relaxed); - sreg oldValuesInUse = oldValuesLimit - oldValuesRemaining; - calculateNextTableSize: - sreg nextTableSize = turf::util::roundUpPowerOf2(oldValuesInUse * 2); - sreg nextLimitNumValues = nextTableSize * 3 / 4; - if (nextLimitNumValues < oldValuesLimit) { - // Set the new limitNumValues on the *current* table. - // This prevents other threads, while the migration is in progress, from concurrently - // re-inserting more values than the new table can hold. - // To set the new limitNumValues on the current table in an atomic fashion, - // we update its valuesRemaining via CAS loop: - for (;;) { - // We must recalculate desiredValuesRemaining on each iteration of the CAS loop - oldValuesInUse = oldValuesLimit - oldValuesRemaining; - sreg desiredValuesRemaining = nextLimitNumValues - oldValuesInUse; - if (desiredValuesRemaining < 0) { - TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues, - desiredValuesRemaining); - // Must recalculate nextTableSize. Goto, baby! - goto calculateNextTableSize; - } - if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed, - turf::Relaxed)) - break; // Success! - // CAS failed because table->valuesRemaining was modified by another thread. - // An updated value has been reloaded into oldValuesRemaining (modified by reference). - // Recalculate desiredValuesRemaining to account for the updated value, and try again. - TURF_TRACE(Linear, 12, "[table] valuesRemaining CAS failed", oldValuesRemaining, desiredValuesRemaining); - } - } - // Now we are assured that the new table will not become overpopulated during the migration process. // Create new migration. - TableMigration* migration = new TableMigration(map); - migration->m_source = table; - migration->m_destination = Table::create(nextTableSize, nextLimitNumValues); + TableMigration* migration = TableMigration::create(map, 1); migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits()); + migration->getSources()[0].table = table; + migration->getSources()[0].sourceIndex.storeNonatomic(0); + migration->m_destination = Table::create(nextTableSize); // Publish the new migration. table->jobCoordinator.storeRelease(migration); } } } + + static void beginTableMigration(Map& map, Table* table) { + // Estimate number of cells in use based on a small sample. + ureg idx = 0; + ureg sampleSize = turf::util::min(table->sizeMask + 1, CellsInUseSample); + ureg inUseCells = 0; + for (; idx < sampleSize; idx++) { + Cell* cell = table->getCells() + idx; + Value value = cell->value.load(turf::Relaxed); + if (value == Value(ValueTraits::Redirect)) { + // Another thread kicked off the jobCoordinator. The caller will participate upon return. + TURF_TRACE(Linear, 11, "[beginTableMigration] redirected while determining table size", 0, 0); + return; + } + if (value != Value(ValueTraits::NullValue)) + inUseCells++; + } + float inUseRatio = float(inUseCells) / sampleSize; + float estimatedInUse = (table->sizeMask + 1) * inUseRatio; +#if JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS + // Periodically underestimate the number of cells in use. + // This exercises the code that handles overflow during migration. + static ureg counter = 1; + if ((++counter & 3) == 0) { + estimatedInUse /= 4; + } +#endif + ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2))); + beginTableMigrationToSize(map, table, nextTableSize); + } }; // Linear template -bool Linear::TableMigration::migrateRange(ureg startIdx) { - ureg srcSizeMask = m_source->sizeMask; +bool Linear::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { + ureg srcSizeMask = srcTable->sizeMask; ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1); sreg valuesMigrated = 0; // Iterate over source range. for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) { - Cell* srcCell = m_source->getCells() + (srcIdx & srcSizeMask); + Cell* srcCell = srcTable->getCells() + (srcIdx & srcSizeMask); Hash srcHash; Value srcValue; // Fetch the srcHash and srcValue. @@ -253,12 +271,12 @@ bool Linear::TableMigration::migrateRange(ureg startIdx) { srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(m_source), srcIdx); + TURF_TRACE(Linear, 12, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); break; } if (srcValue == Value(ValueTraits::NullValue)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(m_source), srcIdx); + TURF_TRACE(Linear, 13, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); // Otherwise, somebody just claimed the cell. Read srcHash again... } else { // Check for deleted/uninitialized value. @@ -267,21 +285,38 @@ bool Linear::TableMigration::migrateRange(ureg startIdx) { // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(m_source), srcIdx); + TURF_TRACE(Linear, 14, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); + if (srcValue == Value(ValueTraits::Redirect)) { + // FIXME: I don't think this will happen. Investigate & change to assert + TURF_TRACE(Linear, 15, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); + break; + } + } else if (srcValue == Value(ValueTraits::Redirect)) { + // srcValue is already marked Redirect due to previous incomplete migration. + TURF_TRACE(Linear, 16, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); + break; } // We've got a key/value pair to migrate. // Reserve a destination cell in the destination. TURF_ASSERT(srcHash != KeyTraits::NullHash); TURF_ASSERT(srcValue != Value(ValueTraits::NullValue)); - TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible. + TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); Cell* dstCell; InsertResult result = insert(srcHash, m_destination, dstCell); // During migration, a hash can only exist in one place among all the source tables, // and it is only migrated by one thread. Therefore, the hash will never already exist // in the destination table: TURF_ASSERT(result != InsertResult_AlreadyFound); - TURF_ASSERT(result != InsertResult_Overflow); + if (result == InsertResult_Overflow) { + // Destination overflow. + // This can happen for several reasons. For example, the source table could have + // existed of all deleted cells when it overflowed, resulting in a small destination + // table size, but then another thread could re-insert all the same hashes + // before the migration completed. + // Caller will cancel the current migration and begin a new one. + return false; + } // Migrate the old value to the new cell. for (;;) { // Copy srcValue to the destination. @@ -296,13 +331,11 @@ bool Linear::TableMigration::migrateRange(ureg startIdx) { // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL // by a late-arriving erase. if (srcValue == Value(ValueTraits::NullValue)) - TURF_TRACE(Linear, 16, "[migrateRange] racing update was erase", uptr(m_source), srcIdx); - else - valuesMigrated++; + TURF_TRACE(Linear, 17, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); break; } // There was a late-arriving write (or erase) to the src. Migrate the new value and try again. - TURF_TRACE(Linear, 17, "[migrateRange] race to update migrated value", uptr(m_source), srcIdx); + TURF_TRACE(Linear, 18, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); srcValue = doubleCheckedSrcValue; } // Cell successfully migrated. Proceed to next source cell. @@ -310,9 +343,6 @@ bool Linear::TableMigration::migrateRange(ureg startIdx) { } } } - sreg prevValuesRemaining = m_destination->valuesRemaining.fetchSub(valuesMigrated, turf::Relaxed); - TURF_ASSERT(valuesMigrated <= prevValuesRemaining); - TURF_UNUSED(prevValuesRemaining); // Range has been migrated successfully. return true; } @@ -324,32 +354,55 @@ void Linear::TableMigration::run() { do { if (probeStatus & 1) { // End flag is already set, so do nothing. - TURF_TRACE(Linear, 18, "[TableMigration::run] already ended", uptr(this), 0); + TURF_TRACE(Linear, 19, "[TableMigration::run] already ended", uptr(this), 0); return; } } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed)); // # of workers has been incremented, and the end flag is clear. TURF_ASSERT((probeStatus & 1) == 0); - // Loop over all migration units in the source table. - for (;;) { - if (m_workerStatus.load(turf::Relaxed) & 1) { - TURF_TRACE(Linear, 19, "[TableMigration::run] detected end flag set", uptr(this), 0); - goto endMigration; - } - ureg startIdx = m_sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); - if (startIdx >= m_source->sizeMask + 1) - break; // No more migration units. - migrateRange(startIdx); - sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed); - TURF_ASSERT(prevRemaining > 0); - if (prevRemaining == 1) { - // That was the last chunk to migrate. - m_workerStatus.fetchOr(1, turf::Relaxed); - goto endMigration; + // Iterate over all source tables. + for (ureg s = 0; s < m_numSources; s++) { + Source& source = getSources()[s]; + // Loop over all migration units in this source table. + for (;;) { + if (m_workerStatus.load(turf::Relaxed) & 1) { + TURF_TRACE(Linear, 20, "[TableMigration::run] detected end flag set", uptr(this), 0); + goto endMigration; + } + ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); + if (startIdx >= source.table->sizeMask + 1) + break; // No more migration units in this table. Try next source table. + bool overflowed = !migrateRange(source.table, startIdx); + if (overflowed) { + // *** FAILED MIGRATION *** + // TableMigration failed due to destination table overflow. + // No other thread can declare the migration successful at this point, because *this* unit will never complete, + // hence m_unitsRemaining won't reach zero. + // However, multiple threads can independently detect a failed migration at the same time. + TURF_TRACE(Linear, 21, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); + // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before + // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from + // the thread + // that deals with it. + bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed); + if (oldOverflowed) + TURF_TRACE(Linear, 22, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), + uptr(oldOverflowed)); + m_workerStatus.fetchOr(1, turf::Relaxed); + goto endMigration; + } + sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed); + TURF_ASSERT(prevRemaining > 0); + if (prevRemaining == 1) { + // *** SUCCESSFUL MIGRATION *** + // That was the last chunk to migrate. + m_workerStatus.fetchOr(1, turf::Relaxed); + goto endMigration; + } } } - TURF_TRACE(Linear, 20, "[TableMigration::run] out of migration units", uptr(this), 0); + TURF_TRACE(Linear, 23, "[TableMigration::run] out of migration units", uptr(this), 0); endMigration: // Decrement the shared # of workers. @@ -357,16 +410,48 @@ endMigration: 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. - TURF_TRACE(Linear, 21, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); + TURF_TRACE(Linear, 24, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); return; } // We're the very last worker thread. - // Publish the new subtree. + // Perform the appropriate post-migration step depending on whether the migration succeeded or failed. TURF_ASSERT(probeStatus == 3); - m_map.publishTableMigration(this); - // End the jobCoodinator. - m_source->jobCoordinator.end(); + bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point + if (!overflowed) { + // The migration succeeded. This is the most likely outcome. Publish the new subtree. + m_map.publishTableMigration(this); + // End the jobCoodinator. + getSources()[0].table->jobCoordinator.end(); + } else { + // The migration failed due to the overflow of the destination table. + Table* origTable = getSources()[0].table; + turf::LockGuard guard(origTable->mutex); + SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); + if (checkedJob != this) { + TURF_TRACE(Linear, 25, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), + uptr(checkedJob)); + } else { + TableMigration* migration = TableMigration::create(m_map, m_numSources + 1); + // Double the destination table size. + migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2); + // Transfer source tables to the new migration. + for (ureg i = 0; i < m_numSources; i++) { + migration->getSources()[i].table = getSources()[i].table; + getSources()[i].table = NULL; + migration->getSources()[i].sourceIndex.storeNonatomic(0); + } + migration->getSources()[m_numSources].table = m_destination; + migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0); + // Calculate total number of migration units to move. + ureg unitsRemaining = 0; + for (ureg s = 0; s < migration->m_numSources; s++) + unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits(); + migration->m_unitsRemaining.storeNonatomic(unitsRemaining); + // Publish the new migration. + origTable->jobCoordinator.storeRelease(migration); + } + } // We're done with this TableMigration. Queue it for GC. DefaultQSBR.enqueue(&TableMigration::destroy, this);