GrampaCounter numFlatTrees;
GrampaCounter numFlatTreeMigrations;
- static GrampaStats Instance; // Zero-initialized
+ static GrampaStats Instance; // Zero-initialized
};
#endif
TURF_TRACE_DECLARE(Grampa, 37)
-template<class Map>
+template <class Map>
struct Grampa {
typedef typename Map::Hash Hash;
typedef typename Map::Value Value;
static const ureg FlatTreeMigrationUnitSize = 32;
static const ureg LinearSearchLimit = 128;
static const ureg CellsInUseSample = LinearSearchLimit;
- TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
+ TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
static const ureg MinTableSize = 8;
// eg. If the entire map is stored in a single table, then Table::shift == HASH_BITS.
// If the entire map is stored in two tables, then Table::shift == (HASH_BITS - 1) for each table.
// FlatTree::shift is always <= Table::shift for all the tables it contains.
- const ureg sizeMask; // a power of two minus one
+ const ureg sizeMask; // a power of two minus one
const Hash baseHash;
const ureg unsafeRangeShift;
- junction::striped::ManualResetEvent isPublished; // To prevent publishing a subtree before its parent is published (happened in testing)
- junction::striped::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
- SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
+ junction::striped::ManualResetEvent
+ isPublished; // To prevent publishing a subtree before its parent is published (happened in testing)
+ junction::striped::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
+ SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
- Table(ureg sizeMask, Hash baseHash, ureg unsafeRangeShift) : sizeMask(sizeMask), baseHash(baseHash), unsafeRangeShift(unsafeRangeShift) {
+ Table(ureg sizeMask, Hash baseHash, ureg unsafeRangeShift)
+ : sizeMask(sizeMask), baseHash(baseHash), unsafeRangeShift(unsafeRangeShift) {
}
static Table* create(ureg tableSize, ureg baseHash, ureg unsafeShift) {
TURF_ASSERT(tableSize >= 4);
ureg numGroups = tableSize >> 2;
Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups);
- new(table) Table(tableSize - 1, baseHash, (u8) unsafeShift);
+ new (table) Table(tableSize - 1, baseHash, (u8) unsafeShift);
for (ureg i = 0; i < numGroups; i++) {
CellGroup* group = table->getCellGroups() + i;
for (ureg j = 0; j < 4; j++) {
};
Map& m_map;
- Hash m_baseHash; // The lowest possible hash value in this subtree; determines index in flattree.
+ Hash m_baseHash; // The lowest possible hash value in this subtree; determines index in flattree.
// If m_numDestinations == 1, m_shift == 0.
- // Otherwise, m_shift tells (indirectly) the size of the flattree in which our subtree would exactly fit: 1 << (HASH_BITS - m_shift).
+ // Otherwise, m_shift tells (indirectly) the size of the flattree in which our subtree would exactly fit: 1 << (HASH_BITS
+ // - m_shift).
// This ensures that m_shift is always less than sizeof(Hash) * 8, so that shifting by m_shift is not undefined behavior.
// To determine the subtree index for a hash during migration, we use: (hash >> m_shift) & (m_numDestinations - 1)
// A mask is used since we are only migrating a subtree -- not necessarily the entire map.
ureg m_safeShift;
- turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
+ turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
turf::Atomic<sreg> m_overflowTableIndex;
turf::Atomic<sreg> m_unitsRemaining;
ureg m_numSources;
- ureg m_numDestinations; // The size of the subtree being created. Some table pointers may be repeated.
+ ureg m_numDestinations; // The size of the subtree being created. Some table pointers may be repeated.
TableMigration(Map& map) : m_map(map) {
}
static TableMigration* create(Map& map, ureg numSources, ureg numDestinations) {
- TableMigration* migration = (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources + sizeof(Table*) * numDestinations);
- new(migration) TableMigration(map);
+ TableMigration* migration = (TableMigration*) TURF_HEAP.alloc(
+ sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources + sizeof(Table*) * numDestinations);
+ new (migration) TableMigration(map);
migration->m_workerStatus.storeNonatomic(0);
migration->m_overflowTableIndex.storeNonatomic(-1);
migration->m_unitsRemaining.storeNonatomic(0);
migration->m_numSources = numSources;
migration->m_numDestinations = numDestinations;
- // Caller is responsible for filling in source & destination pointers
#if JUNCTION_TRACK_GRAMPA_STATS
GrampaStats::Instance.numTableMigrations.increment();
#endif
+ // Caller is responsible for filling in source & destination pointers
return migration;
}
// Each time the flattree doubles in size, shift decreases by 1.
const ureg safeShift;
junction::striped::Mutex mutex;
- FlatTreeMigration* migration; // Protected by mutex
+ FlatTreeMigration* migration; // Protected by mutex
FlatTree(ureg safeShift) : safeShift(safeShift), migration(NULL) {
// A FlatTree always has at least two tables, so the shift is always safe.
TURF_ASSERT(safeShift < sizeof(Hash) * 8);
ureg numLeaves = (Hash(-1) >> safeShift) + 1;
FlatTree* flatTree = (FlatTree*) TURF_HEAP.alloc(sizeof(FlatTree) + sizeof(turf::Atomic<Table*>) * numLeaves);
- new(flatTree) FlatTree(safeShift);
- // Caller will initialize flatTree->getTables()
+ new (flatTree) FlatTree(safeShift);
#if JUNCTION_TRACK_GRAMPA_STATS
GrampaStats::Instance.numFlatTrees.increment();
#endif
+ // Caller will initialize flatTree->getTables()
return flatTree;
}
}
// FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
- enum InsertResult {
- InsertResult_AlreadyFound,
- InsertResult_InsertedNew,
- InsertResult_Overflow
- };
+ enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
static InsertResult insert(Hash hash, Table* table, ureg sizeMask, Cell*& cell, ureg& overflowIdx) {
TURF_TRACE(Grampa, 3, "[insert] called", uptr(table), hash);
TURF_ASSERT(table);
probeHash = cell->hash.load(turf::Acquire);
} while (probeHash == KeyTraits::NullHash);
}
- TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
+ TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
if (probeHash == hash) {
TURF_TRACE(Grampa, 8, "[insert] found in probe chain", uptr(table), idx);
return InsertResult_AlreadyFound;
// Reached the end of the link chain for this bucket.
// Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket.
ureg prevLinkIdx = idx;
- TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
+ TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit);
while (linearProbesRemaining-- > 0) {
idx++;
TURF_TRACE(Grampa, 9, "[insert] reserved cell", uptr(table), idx);
TURF_ASSERT(probeDelta == 0);
u8 desiredDelta = idx - prevLinkIdx;
+#if TURF_WITH_ASSERTS
// Note: another thread could actually set the link on our behalf (see below).
-#if TURF_WITH_ASSERTS
probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
#else
prevLink->store(desiredDelta, turf::Relaxed);
-#endif
+#endif
return InsertResult_InsertedNew;
} else {
TURF_TRACE(Grampa, 10, "[insert] race to reserve cell", uptr(table), idx);
// there's no guarantee that our own link chain will be well-formed by the time this function returns.
// (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.)
u8 desiredDelta = idx - prevLinkIdx;
-#if TURF_WITH_ASSERTS
+#if TURF_WITH_ASSERTS
probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
if (probeDelta == 0)
TURF_TRACE(Grampa, 13, "[insert] set link on behalf of late-arriving cell", uptr(table), idx);
#else
prevLink->store(desiredDelta, turf::Relaxed);
-#endif
- goto followLink; // Try to follow link chain for the bucket again.
+#endif
+ goto followLink; // Try to follow link chain for the bucket again.
}
// Continue linear search...
}
TURF_TRACE(Grampa, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0);
} else {
turf::LockGuard<junction::striped::Mutex> guard(table->mutex);
- job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
+ job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
if (job) {
TURF_TRACE(Grampa, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
} else {
migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
migration->getSources()[0].table = table;
migration->getSources()[0].sourceIndex.storeNonatomic(0);
- ureg subRangeShift = table->unsafeRangeShift - splitShift; // subRangeShift is also "unsafe" (possibly represents entire range)
+ ureg subRangeShift =
+ table->unsafeRangeShift - splitShift; // subRangeShift is also "unsafe" (possibly represents entire range)
ureg hashOffsetDelta = subRangeShift < (sizeof(Hash) * 8) ? (ureg(1) << subRangeShift) : 0;
for (ureg i = 0; i < numDestinations; i++) {
- migration->getDestinations()[i] = Table::create(nextTableSize, table->baseHash + hashOffsetDelta * i, subRangeShift);
+ migration->getDestinations()[i] =
+ Table::create(nextTableSize, table->baseHash + hashOffsetDelta * i, subRangeShift);
}
// Publish the new migration.
table->jobCoordinator.storeRelease(migration);
}
beginTableMigrationToSize(map, table, nextTableSize, splitShift);
}
-
+
static FlatTreeMigration* createFlatTreeMigration(Map& map, FlatTree* flatTree, ureg shift) {
turf::LockGuard<junction::striped::Mutex> guard(flatTree->mutex);
if (!flatTree->migration) {
static FlatTreeMigration* getExistingFlatTreeMigration(FlatTree* flatTree) {
turf::LockGuard<junction::striped::Mutex> guard(flatTree->mutex);
- TURF_ASSERT(flatTree->migration); // Must already exist!
+ TURF_ASSERT(flatTree->migration); // Must already exist!
return flatTree->migration;
}
}; // Grampa
// Return index of the destination table that overflowed, or -1 if none
-template<class Map>
+template <class Map>
sreg Grampa<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
ureg srcSizeMask = srcTable->sizeMask;
ureg safeShift = m_safeShift;
srcHash = srcCell->hash.load(turf::Relaxed);
if (srcHash == KeyTraits::NullHash) {
// An unused cell. Try to put a Redirect marker in its value.
- srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
+ srcValue =
+ srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
if (srcValue == Value(ValueTraits::Redirect)) {
// srcValue is already marked Redirect due to previous incomplete migration.
TURF_TRACE(Grampa, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
break;
}
if (srcValue == Value(ValueTraits::NullValue))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
TURF_TRACE(Grampa, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
// Otherwise, somebody just claimed the cell. Read srcHash again...
} else {
if (srcValue == Value(ValueTraits::NullValue)) {
// Try to put a Redirect marker.
if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
TURF_TRACE(Grampa, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
if (srcValue == Value(ValueTraits::Redirect)) {
// FIXME: I don't think this will happen. Investigate & change to assert
TURF_TRACE(Grampa, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
break;
}
-
+
// We've got a key/value pair to migrate.
// Reserve a destination cell in dstTable.
TURF_ASSERT(srcHash != KeyTraits::NullHash);
// Copy srcValue to the destination.
dstCell->value.store(srcValue, turf::Relaxed);
// Try to place a Redirect marker in srcValue.
- Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
- TURF_ASSERT(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
+ Value doubleCheckedSrcValue =
+ srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
+ TURF_ASSERT(doubleCheckedSrcValue !=
+ Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
if (doubleCheckedSrcValue == srcValue) {
// No racing writes to the src. We've successfully placed the Redirect marker.
// srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
}
ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
if (startIdx >= source.table->sizeMask + 1)
- break; // No more migration units in this table. Try next source table.
+ break; // No more migration units in this table. Try next source table.
sreg overflowTableIndex = migrateRange(source.table, startIdx);
- if (overflowTableIndex >= 0) {
+ if (overflowTableIndex >= 0) {
// *** FAILED MIGRATION ***
// TableMigration failed due to destination table overflow.
- // No other thread can declare the migration successful at this point, because *this* unit will never complete, hence m_unitsRemaining won't reach zero.
+ // No other thread can declare the migration successful at this point, because *this* unit will never complete,
+ // hence m_unitsRemaining won't reach zero.
// However, multiple threads can independently detect a failed migration at the same time.
TURF_TRACE(Grampa, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
- // The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads before
- // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from the thread
+ // The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads
+ // before
+ // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
+ // the thread
// that deals with it.
// Store overflowTableIndex unconditionally; racing writes should be rare, and it doesn't matter which one wins.
sreg oldIndex = m_overflowTableIndex.exchange(overflowTableIndex, turf::Relaxed);
if (oldIndex >= 0)
- TURF_TRACE(Grampa, 29, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex), uptr(oldIndex));
+ TURF_TRACE(Grampa, 29, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex),
+ uptr(oldIndex));
m_workerStatus.fetchOr(1, turf::Relaxed);
goto endMigration;
}
endMigration:
// Decrement the shared # of workers.
- probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // Ensure all modifications are visible to the thread that will publish
+ probeStatus =
+ m_workerStatus.fetchSub(2, turf::AcquireRelease); // Ensure all modifications are visible to the thread that will publish
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
TURF_TRACE(Grampa, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
// We're the very last worker thread.
// Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
TURF_ASSERT(probeStatus == 3);
- sreg overflowTableIndex = m_overflowTableIndex.loadNonatomic(); // No racing writes at this point
+ sreg overflowTableIndex = m_overflowTableIndex.loadNonatomic(); // No racing writes at this point
if (overflowTableIndex < 0) {
// The migration succeeded. This is the most likely outcome. Publish the new subtree.
m_map.publishTableMigration(this);
turf::LockGuard<junction::striped::Mutex> guard(origTable->mutex);
SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
if (checkedJob != this) {
- TURF_TRACE(Grampa, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), uptr(checkedJob));
+ TURF_TRACE(Grampa, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+ uptr(checkedJob));
} else {
TableMigration* migration;
Table* overflowedTable = getDestinations()[overflowTableIndex];
if (overflowedTable->sizeMask + 1 < LeafSize) {
// The entire map is contained in a small table.
- TURF_TRACE(Grampa, 33, "[TableMigration::run] overflow occured in a small map", uptr(origTable), uptr(checkedJob));
+ TURF_TRACE(Grampa, 33, "[TableMigration::run] overflow occured in a small map", uptr(origTable),
+ uptr(checkedJob));
TURF_ASSERT(overflowedTable->unsafeRangeShift == sizeof(Hash) * 8);
TURF_ASSERT(overflowedTable->baseHash == 0);
TURF_ASSERT(m_numDestinations == 1);
migration->m_baseHash = 0;
migration->m_safeShift = 0;
// Double the destination table size.
- migration->getDestinations()[0] = Table::create((overflowedTable->sizeMask + 1) * 2, overflowedTable->baseHash, overflowedTable->unsafeRangeShift);
+ migration->getDestinations()[0] = Table::create((overflowedTable->sizeMask + 1) * 2, overflowedTable->baseHash,
+ overflowedTable->unsafeRangeShift);
} else {
// The overflowed table is already the size of a leaf. Split it into two ranges.
if (count == 1) {
- TURF_TRACE(Grampa, 34, "[TableMigration::run] doubling subtree size after failure", uptr(origTable), uptr(checkedJob));
+ TURF_TRACE(Grampa, 34, "[TableMigration::run] doubling subtree size after failure", uptr(origTable),
+ uptr(checkedJob));
migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations * 2);
migration->m_baseHash = m_baseHash;
migration->m_safeShift = getUnsafeShift() - 1;
}
count = 2;
} else {
- TURF_TRACE(Grampa, 35, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable), uptr(checkedJob));
+ TURF_TRACE(Grampa, 35, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable),
+ uptr(checkedJob));
migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations);
migration->m_baseHash = m_baseHash;
migration->m_safeShift = m_safeShift;
migration->getDestinations()[lo + i] = splitTable1;
}
ureg halfNumHashes = ureg(1) << (origTable->unsafeRangeShift - 1);
- Table* splitTable2 = Table::create(LeafSize, origTable->baseHash + halfNumHashes, origTable->unsafeRangeShift - 1);
+ Table* splitTable2 =
+ Table::create(LeafSize, origTable->baseHash + halfNumHashes, origTable->unsafeRangeShift - 1);
for (; i < count; i++) {
migration->getDestinations()[lo + i] = splitTable2;
}
DefaultQSBR.enqueue(&TableMigration::destroy, this);
}
-template<class Map>
+template <class Map>
void Grampa<Map>::FlatTreeMigration::run() {
// Conditionally increment the shared # of workers.
ureg probeStatus = m_workerStatus.load(turf::Relaxed);
for (;;) {
ureg srcStart = m_sourceIndex.fetchAdd(FlatTreeMigrationUnitSize, turf::Relaxed);
if (srcStart >= srcSize)
- break; // No more migration units in this flattree.
+ break; // No more migration units in this flattree.
// Migrate this range
ureg srcEnd = turf::util::min(srcSize, srcStart + FlatTreeMigrationUnitSize);
ureg dst = srcStart * repeat;
}
// Decrement the shared # of workers.
- probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
+ probeStatus = m_workerStatus.fetchSub(
+ 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
if (probeStatus >= 4) {
// There are other workers remaining. Return here so that only the very last worker will proceed.
return;
// We're the very last worker thread.
// Publish the new flattree.
- TURF_ASSERT(probeStatus == 3); // End flag must be set
+ TURF_ASSERT(probeStatus == 3); // End flag must be set
m_map.publishFlatTreeMigration(this);
m_completed.signal();