namespace junction {
namespace details {
-TURF_TRACE_DECLARE(LeapFrog, 34)
+TURF_TRACE_DECLARE(LeapFrog, 33)
template <class Map>
struct LeapFrog {
// FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
- static InsertResult insert(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) {
- TURF_TRACE(LeapFrog, 3, "[insert] called", uptr(table), hash);
+ static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) {
+ TURF_TRACE(LeapFrog, 3, "[insertOrFind] called", uptr(table), hash);
TURF_ASSERT(table);
TURF_ASSERT(hash != KeyTraits::NullHash);
ureg sizeMask = table->sizeMask;
Hash probeHash = cell->hash.load(turf::Relaxed);
if (probeHash == KeyTraits::NullHash) {
if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
- TURF_TRACE(LeapFrog, 4, "[insert] reserved first cell", uptr(table), idx);
+ TURF_TRACE(LeapFrog, 4, "[insertOrFind] reserved first cell", uptr(table), idx);
// There are no links to set. We're done.
return InsertResult_InsertedNew;
} else {
- TURF_TRACE(LeapFrog, 5, "[insert] race to reserve first cell", uptr(table), idx);
+ TURF_TRACE(LeapFrog, 5, "[insertOrFind] race to reserve first cell", uptr(table), idx);
// Fall through to check if it was the same hash...
}
}
if (probeHash == hash) {
- TURF_TRACE(LeapFrog, 6, "[insert] found in first cell", uptr(table), idx);
+ TURF_TRACE(LeapFrog, 6, "[insertOrFind] found in first cell", uptr(table), idx);
return InsertResult_AlreadyFound;
}
// Cell was linked, but hash is not visible yet.
// We could avoid this case (and guarantee it's visible) using acquire & release, but instead,
// just poll until it becomes visible.
- TURF_TRACE(LeapFrog, 7, "[insert] race to read hash", uptr(table), idx);
+ TURF_TRACE(LeapFrog, 7, "[insertOrFind] race to read hash", uptr(table), idx);
do {
probeHash = cell->hash.load(turf::Acquire);
} while (probeHash == KeyTraits::NullHash);
}
TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
if (probeHash == hash) {
- TURF_TRACE(LeapFrog, 8, "[insert] found in probe chain", uptr(table), idx);
+ TURF_TRACE(LeapFrog, 8, "[insertOrFind] found in probe chain", uptr(table), idx);
return InsertResult_AlreadyFound;
}
} else {
// It's an empty cell. Try to reserve it.
if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
// Success. We've reserved the cell. Link it to previous cell in same bucket.
- TURF_TRACE(LeapFrog, 9, "[insert] reserved cell", uptr(table), idx);
+ TURF_TRACE(LeapFrog, 9, "[insertOrFind] reserved cell", uptr(table), idx);
TURF_ASSERT(probeDelta == 0);
u8 desiredDelta = idx - prevLinkIdx;
#if TURF_WITH_ASSERTS
#endif
return InsertResult_InsertedNew;
} else {
- TURF_TRACE(LeapFrog, 10, "[insert] race to reserve cell", uptr(table), idx);
+ TURF_TRACE(LeapFrog, 10, "[insertOrFind] race to reserve cell", uptr(table), idx);
// Fall through to check if it's the same hash...
}
}
Hash x = (probeHash ^ hash);
// Check for same hash.
if (!x) {
- TURF_TRACE(LeapFrog, 11, "[insert] found outside probe chain", uptr(table), idx);
+ TURF_TRACE(LeapFrog, 11, "[insertOrFind] found outside probe chain", uptr(table), idx);
return InsertResult_AlreadyFound;
}
// Check for same bucket.
if ((x & sizeMask) == 0) {
- TURF_TRACE(LeapFrog, 12, "[insert] found late-arriving cell in same bucket", uptr(table), idx);
+ TURF_TRACE(LeapFrog, 12, "[insertOrFind] found late-arriving cell in same bucket", uptr(table), idx);
// Attempt to set the link on behalf of the late-arriving cell.
// This is usually redundant, but if we don't attempt to set the late-arriving cell's link here,
// there's no guarantee that our own link chain will be well-formed by the time this function returns.
probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
if (probeDelta == 0)
- TURF_TRACE(LeapFrog, 13, "[insert] set link on behalf of late-arriving cell", uptr(table), idx);
+ TURF_TRACE(LeapFrog, 13, "[insertOrFind] set link on behalf of late-arriving cell", uptr(table), idx);
#else
prevLink->store(desiredDelta, turf::Relaxed);
#endif
}
// Table is too full to insert.
overflowIdx = idx + 1;
- TURF_TRACE(LeapFrog, 14, "[insert] overflow", uptr(table), overflowIdx);
+ TURF_TRACE(LeapFrog, 14, "[insertOrFind] overflow", uptr(table), overflowIdx);
return InsertResult_Overflow;
}
}
}
}
- static void beginTableMigration(Map& map, Table* table, ureg overflowIdx, bool mustDouble) {
- ureg nextTableSize;
- if (mustDouble) {
- TURF_TRACE(LeapFrog, 18, "[beginTableMigration] forced to double", 0, 0);
- nextTableSize = (table->sizeMask + 1) * 2;
- } else {
- // Estimate number of cells in use based on a small sample.
- ureg sizeMask = table->sizeMask;
- ureg idx = overflowIdx - CellsInUseSample;
- ureg inUseCells = 0;
- for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
- CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
- Cell* cell = group->cells + (idx & 3);
- Value value = cell->value.load(turf::Relaxed);
- if (value == Value(ValueTraits::Redirect)) {
- // Another thread kicked off the jobCoordinator. The caller will participate upon return.
- TURF_TRACE(LeapFrog, 19, "[beginTableMigration] redirected while determining table size", 0, 0);
- return;
- }
- if (value != Value(ValueTraits::NullValue))
- inUseCells++;
- idx++;
+ static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) {
+ // Estimate number of cells in use based on a small sample.
+ ureg sizeMask = table->sizeMask;
+ ureg idx = overflowIdx - CellsInUseSample;
+ ureg inUseCells = 0;
+ for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
+ CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
+ Cell* cell = group->cells + (idx & 3);
+ Value value = cell->value.load(turf::Relaxed);
+ if (value == Value(ValueTraits::Redirect)) {
+ // Another thread kicked off the jobCoordinator. The caller will participate upon return.
+ TURF_TRACE(LeapFrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0);
+ return;
}
- float inUseRatio = float(inUseCells) / CellsInUseSample;
- float estimatedInUse = (sizeMask + 1) * inUseRatio;
+ if (value != Value(ValueTraits::NullValue))
+ inUseCells++;
+ idx++;
+ }
+ float inUseRatio = float(inUseCells) / CellsInUseSample;
+ float estimatedInUse = (sizeMask + 1) * inUseRatio;
#if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS
- // Periodically underestimate the number of cells in use.
- // This exercises the code that handles overflow during migration.
- static ureg counter = 1;
- if ((++counter & 3) == 0) {
- estimatedInUse /= 4;
- }
-#endif
- nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
+ // Periodically underestimate the number of cells in use.
+ // This exercises the code that handles overflow during migration.
+ static ureg counter = 1;
+ if ((++counter & 3) == 0) {
+ estimatedInUse /= 4;
}
+#endif
+ ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
beginTableMigrationToSize(map, table, nextTableSize);
}
}; // LeapFrog
srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
- TURF_TRACE(LeapFrog, 20, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
break;
}
if (srcValue == Value(ValueTraits::NullValue))
break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
// Otherwise, somebody just claimed the cell. Read srcHash again...
} else {
// Check for deleted/uninitialized value.
// Try to put a Redirect marker.
if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(LeapFrog, 22, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
if (srcValue == Value(ValueTraits::Redirect)) {
// FIXME: I don't think this will happen. Investigate & change to assert
- TURF_TRACE(LeapFrog, 23, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
break;
}
} else if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
- TURF_TRACE(LeapFrog, 24, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
break;
}
TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
Cell* dstCell;
ureg overflowIdx;
- InsertResult result = insert(srcHash, m_destination, dstCell, overflowIdx);
+ InsertResult result = insertOrFind(srcHash, m_destination, dstCell, overflowIdx);
// During migration, a hash can only exist in one place among all the source tables,
// and it is only migrated by one thread. Therefore, the hash will never already exist
// in the destination table:
// srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
// by a late-arriving erase.
if (srcValue == Value(ValueTraits::NullValue))
- TURF_TRACE(LeapFrog, 25, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
break;
}
// There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
- TURF_TRACE(LeapFrog, 26, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
+ TURF_TRACE(LeapFrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
srcValue = doubleCheckedSrcValue;
}
// Cell successfully migrated. Proceed to next source cell.
do {
if (probeStatus & 1) {
// End flag is already set, so do nothing.
- TURF_TRACE(LeapFrog, 27, "[TableMigration::run] already ended", uptr(this), 0);
+ TURF_TRACE(LeapFrog, 26, "[TableMigration::run] already ended", uptr(this), 0);
return;
}
} while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
// Loop over all migration units in this source table.
for (;;) {
if (m_workerStatus.load(turf::Relaxed) & 1) {
- TURF_TRACE(LeapFrog, 28, "[TableMigration::run] detected end flag set", uptr(this), 0);
+ TURF_TRACE(LeapFrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0);
goto endMigration;
}
ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
// No other thread can declare the migration successful at this point, because *this* unit will never complete,
// hence m_unitsRemaining won't reach zero.
// However, multiple threads can independently detect a failed migration at the same time.
- TURF_TRACE(LeapFrog, 29, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
+ TURF_TRACE(LeapFrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
// The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
// we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
// the thread
// that deals with it.
bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
if (oldOverflowed)
- TURF_TRACE(LeapFrog, 30, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
+ TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
uptr(oldOverflowed));
m_workerStatus.fetchOr(1, turf::Relaxed);
goto endMigration;
}
}
}
- TURF_TRACE(LeapFrog, 31, "[TableMigration::run] out of migration units", uptr(this), 0);
+ TURF_TRACE(LeapFrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0);
endMigration:
// Decrement the shared # of workers.
2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
- TURF_TRACE(LeapFrog, 32, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
+ TURF_TRACE(LeapFrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
return;
}
turf::LockGuard<turf::Mutex> guard(origTable->mutex);
SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
if (checkedJob != this) {
- TURF_TRACE(LeapFrog, 33, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+ TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
uptr(checkedJob));
} else {
TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);