TURF_TRACE_DECLARE(Linear, 22)
-template<class Map>
+template <class Map>
struct Linear {
typedef typename Map::Hash Hash;
typedef typename Map::Value Value;
static const ureg TableMigrationUnitSize = 32;
static const ureg LinearSearchLimit = 128;
static const ureg CellsInUseSample = LinearSearchLimit;
- TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
+ TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
struct Cell {
};
struct Table {
- const ureg sizeMask; // a power of two minus one
+ const ureg sizeMask; // a power of two minus one
const ureg limitNumValues;
turf::Atomic<sreg> cellsRemaining;
turf::Atomic<sreg> valuesRemaining;
- turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
- SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
+ turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
+ SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
- Table(ureg sizeMask, ureg limitNumValues) : sizeMask(sizeMask), limitNumValues(limitNumValues),
- cellsRemaining(limitNumValues), valuesRemaining(limitNumValues) {
+ Table(ureg sizeMask, ureg limitNumValues)
+ : sizeMask(sizeMask), limitNumValues(limitNumValues), cellsRemaining(limitNumValues),
+ valuesRemaining(limitNumValues) {
}
static Table* create(ureg tableSize, ureg limitNumValues) {
TURF_ASSERT(turf::util::isPowerOf2(tableSize));
Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
- new(table) Table(tableSize - 1, limitNumValues);
+ new (table) Table(tableSize - 1, limitNumValues);
for (ureg j = 0; j < tableSize; j++) {
table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
Table* m_source;
turf::Atomic<ureg> m_sourceIndex;
Table* m_destination;
- turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
+ turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
turf::Atomic<sreg> m_unitsRemaining;
TableMigration(Map& map) : m_map(map), m_sourceIndex(0), m_workerStatus(0), m_unitsRemaining(0) {
}
// FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
- enum InsertResult {
- InsertResult_AlreadyFound,
- InsertResult_InsertedNew,
- InsertResult_Overflow
- };
+ enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
static InsertResult insert(Hash hash, Table* table, Cell*& cell) {
TURF_TRACE(Linear, 2, "[insert] called", uptr(table), hash);
TURF_ASSERT(table);
uptr probeHash = cell->hash.load(turf::Relaxed);
if (probeHash == hash) {
TURF_TRACE(Linear, 3, "[insert] found existing cell", uptr(table), idx);
- return InsertResult_AlreadyFound; // Key found in table. Return the existing cell.
+ return InsertResult_AlreadyFound; // Key found in table. Return the existing cell.
}
if (probeHash == KeyTraits::NullHash) {
// It's an empty cell. Try to reserve it.
if (prevCellsRemaining <= 0) {
// Table is overpopulated.
TURF_TRACE(Linear, 4, "[insert] ran out of cellsRemaining", prevCellsRemaining, 0);
- table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
+ table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
return InsertResult_Overflow;
}
// Try to reserve this cell.
}
// There was a race and another thread reserved that cell from under us.
TURF_TRACE(Linear, 6, "[insert] detected race to reserve cell", ureg(hash), idx);
- table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
+ table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
if (prevHash == hash) {
TURF_TRACE(Linear, 7, "[insert] race reserved same hash", ureg(hash), idx);
- return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell.
+ return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell.
}
}
// Try again in the next cell.
TURF_TRACE(Linear, 9, "[beginTableMigration] new migration already exists", 0, 0);
} else {
turf::LockGuard<turf::Mutex> guard(table->mutex);
- job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
+ job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
if (job) {
TURF_TRACE(Linear, 10, "[beginTableMigration] new migration already exists (double-checked)", 0, 0);
} else {
// re-inserting more values than the new table can hold.
// To set the new limitNumValues on the current table in an atomic fashion,
// we update its valuesRemaining via CAS loop:
- for(;;) {
+ for (;;) {
// We must recalculate desiredValuesRemaining on each iteration of the CAS loop
oldValuesInUse = oldValuesLimit - oldValuesRemaining;
sreg desiredValuesRemaining = nextLimitNumValues - oldValuesInUse;
if (desiredValuesRemaining < 0) {
- TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues, desiredValuesRemaining);
+ TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues,
+ desiredValuesRemaining);
// Must recalculate nextTableSize. Goto, baby!
goto calculateNextTableSize;
}
- if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed, turf::Relaxed))
+ if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed,
+ turf::Relaxed))
break; // Success!
// CAS failed because table->valuesRemaining was modified by another thread.
// An updated value has been reloaded into oldValuesRemaining (modified by reference).
}
}; // Linear
-template<class Map>
+template <class Map>
bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
ureg srcSizeMask = m_source->sizeMask;
ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
srcHash = srcCell->hash.load(turf::Relaxed);
if (srcHash == KeyTraits::NullHash) {
// An unused cell. Try to put a Redirect marker in its value.
- srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
+ srcValue =
+ srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(m_source), srcIdx);
break;
}
if (srcValue == Value(ValueTraits::NullValue))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(m_source), srcIdx);
// Otherwise, somebody just claimed the cell. Read srcHash again...
} else {
if (srcValue == Value(ValueTraits::NullValue)) {
// Try to put a Redirect marker.
if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(m_source), srcIdx);
}
-
+
// We've got a key/value pair to migrate.
// Reserve a destination cell in the destination.
TURF_ASSERT(srcHash != KeyTraits::NullHash);
TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
- TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible.
+ TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible.
Cell* dstCell;
InsertResult result = insert(srcHash, m_destination, dstCell);
// During migration, a hash can only exist in one place among all the source tables,
// Copy srcValue to the destination.
dstCell->value.store(srcValue, turf::Relaxed);
// Try to place a Redirect marker in srcValue.
- Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
- TURF_ASSERT(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
+ Value doubleCheckedSrcValue =
+ srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
+ TURF_ASSERT(doubleCheckedSrcValue !=
+ Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
if (doubleCheckedSrcValue == srcValue) {
// No racing writes to the src. We've successfully placed the Redirect marker.
// srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
}
ureg startIdx = m_sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
if (startIdx >= m_source->sizeMask + 1)
- break; // No more migration units.
+ break; // No more migration units.
migrateRange(startIdx);
sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
TURF_ASSERT(prevRemaining > 0);
endMigration:
// Decrement the shared # of workers.
- probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
+ probeStatus = m_workerStatus.fetchSub(
+ 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
TURF_TRACE(Linear, 21, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));