X-Git-Url: http://plrg.eecs.uci.edu/git/?p=junction.git;a=blobdiff_plain;f=junction%2Fdetails%2FLeapFrog.h;fp=junction%2Fdetails%2FLeapFrog.h;h=e5af7b8211c7d74991a0a6c682d64db121c7f23d;hp=7b1e39574ca8d3a73a1ef032e6d8e7a42177d7e8;hb=e8d98a04ab8e1fc34bef37bd9b802c49510c9e83;hpb=4249e1abd767b5cbbe9b722710b90ed49cd0390a diff --git a/junction/details/LeapFrog.h b/junction/details/LeapFrog.h index 7b1e395..e5af7b8 100644 --- a/junction/details/LeapFrog.h +++ b/junction/details/LeapFrog.h @@ -29,7 +29,7 @@ namespace details { TURF_TRACE_DECLARE(LeapFrog, 33) -template +template struct LeapFrog { typedef typename Map::Hash Hash; typedef typename Map::Value Value; @@ -40,7 +40,7 @@ struct LeapFrog { static const ureg TableMigrationUnitSize = 32; static const ureg LinearSearchLimit = 128; static const ureg CellsInUseSample = LinearSearchLimit; - TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links + TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain struct Cell { @@ -60,9 +60,9 @@ struct LeapFrog { }; struct Table { - const ureg sizeMask; // a power of two minus one - turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) - SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration + const ureg sizeMask; // a power of two minus one + turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) + SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration Table(ureg sizeMask) : sizeMask(sizeMask) { } @@ -72,7 +72,7 @@ struct LeapFrog { TURF_ASSERT(tableSize >= 4); ureg numGroups = tableSize >> 2; Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups); - new(table) Table(tableSize - 1); + new (table) Table(tableSize - 1); for (ureg i = 0; i < numGroups; i++) { CellGroup* group = table->getCellGroups() + i; for (ureg j = 0; j < 4; j++) { @@ -108,7 +108,7 @@ struct LeapFrog { Map& m_map; Table* m_destination; - turf::Atomic m_workerStatus; // number of workers + end flag + turf::Atomic m_workerStatus; // number of workers + end flag turf::Atomic m_overflowed; turf::Atomic m_unitsRemaining; ureg m_numSources; @@ -117,8 +117,9 @@ struct LeapFrog { } static TableMigration* create(Map& map, ureg numSources) { - TableMigration* migration = (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources); - new(migration) TableMigration(map); + TableMigration* migration = + (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources); + new (migration) TableMigration(map); migration->m_workerStatus.storeNonatomic(0); migration->m_overflowed.storeNonatomic(false); migration->m_unitsRemaining.storeNonatomic(0); @@ -184,11 +185,7 @@ struct LeapFrog { } // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound. - enum InsertResult { - InsertResult_AlreadyFound, - InsertResult_InsertedNew, - InsertResult_Overflow - }; + enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow }; static InsertResult insert(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) { TURF_TRACE(LeapFrog, 3, "[insert] called", uptr(table), hash); TURF_ASSERT(table); @@ -239,7 +236,7 @@ struct LeapFrog { probeHash = cell->hash.load(turf::Acquire); } while (probeHash == KeyTraits::NullHash); } - TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked + TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked if (probeHash == hash) { TURF_TRACE(LeapFrog, 8, "[insert] found in probe chain", uptr(table), idx); return InsertResult_AlreadyFound; @@ -248,7 +245,7 @@ struct LeapFrog { // Reached the end of the link chain for this bucket. // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket. ureg prevLinkIdx = idx; - TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range. + TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range. ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit); while (linearProbesRemaining-- > 0) { idx++; @@ -262,7 +259,7 @@ struct LeapFrog { TURF_TRACE(LeapFrog, 9, "[insert] reserved cell", uptr(table), idx); TURF_ASSERT(probeDelta == 0); u8 desiredDelta = idx - prevLinkIdx; -#if TURF_WITH_ASSERTS +#if TURF_WITH_ASSERTS probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed); TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta); #else @@ -288,15 +285,15 @@ struct LeapFrog { // there's no guarantee that our own link chain will be well-formed by the time this function returns. // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.) u8 desiredDelta = idx - prevLinkIdx; -#if TURF_WITH_ASSERTS +#if TURF_WITH_ASSERTS probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed); TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta); if (probeDelta == 0) TURF_TRACE(LeapFrog, 13, "[insert] set link on behalf of late-arriving cell", uptr(table), idx); #else prevLink->store(desiredDelta, turf::Relaxed); -#endif - goto followLink; // Try to follow link chain for the bucket again. +#endif + goto followLink; // Try to follow link chain for the bucket again. } // Continue linear search... } @@ -316,7 +313,7 @@ struct LeapFrog { TURF_TRACE(LeapFrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0); } else { turf::LockGuard guard(table->mutex); - job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. + job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. if (job) { TURF_TRACE(LeapFrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0); } else { @@ -357,7 +354,7 @@ struct LeapFrog { } }; // LeapFrog -template +template bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { ureg srcSizeMask = srcTable->sizeMask; ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1); @@ -372,14 +369,15 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) srcHash = srcCell->hash.load(turf::Relaxed); if (srcHash == KeyTraits::NullHash) { // An unused cell. Try to put a Redirect marker in its value. - srcValue = srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); + srcValue = + srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); if (srcValue == Value(ValueTraits::Redirect)) { // srcValue is already marked Redirect due to previous incomplete migration. TURF_TRACE(LeapFrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); break; } if (srcValue == Value(ValueTraits::NullValue)) - break; // Redirect has been placed. Break inner loop, continue outer loop. + break; // Redirect has been placed. Break inner loop, continue outer loop. TURF_TRACE(LeapFrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); // Otherwise, somebody just claimed the cell. Read srcHash again... } else { @@ -388,7 +386,7 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) if (srcValue == Value(ValueTraits::NullValue)) { // Try to put a Redirect marker. if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) - break; // Redirect has been placed. Break inner loop, continue outer loop. + break; // Redirect has been placed. Break inner loop, continue outer loop. TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); if (srcValue == Value(ValueTraits::Redirect)) { // FIXME: I don't think this will happen. Investigate & change to assert @@ -400,7 +398,7 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) TURF_TRACE(LeapFrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); break; } - + // We've got a key/value pair to migrate. // Reserve a destination cell in the destination. TURF_ASSERT(srcHash != KeyTraits::NullHash); @@ -427,8 +425,10 @@ bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) // Copy srcValue to the destination. dstCell->value.store(srcValue, turf::Relaxed); // Try to place a Redirect marker in srcValue. - Value doubleCheckedSrcValue = srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed); - TURF_ASSERT(doubleCheckedSrcValue != Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. + Value doubleCheckedSrcValue = + srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed); + TURF_ASSERT(doubleCheckedSrcValue != + Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. if (doubleCheckedSrcValue == srcValue) { // No racing writes to the src. We've successfully placed the Redirect marker. // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL @@ -475,20 +475,23 @@ void LeapFrog::TableMigration::run() { } ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); if (startIdx >= source.table->sizeMask + 1) - break; // No more migration units in this table. Try next source table. + break; // No more migration units in this table. Try next source table. bool overflowed = !migrateRange(source.table, startIdx); if (overflowed) { // *** FAILED MIGRATION *** // TableMigration failed due to destination table overflow. - // No other thread can declare the migration successful at this point, because *this* unit will never complete, hence m_unitsRemaining won't reach zero. + // No other thread can declare the migration successful at this point, because *this* unit will never complete, + // hence m_unitsRemaining won't reach zero. // However, multiple threads can independently detect a failed migration at the same time. TURF_TRACE(LeapFrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before - // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from the thread + // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from + // the thread // that deals with it. bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed); if (oldOverflowed) - TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), uptr(oldOverflowed)); + TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), + uptr(oldOverflowed)); m_workerStatus.fetchOr(1, turf::Relaxed); goto endMigration; } @@ -506,7 +509,8 @@ void LeapFrog::TableMigration::run() { endMigration: // Decrement the shared # of workers. - probeStatus = m_workerStatus.fetchSub(2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. + probeStatus = m_workerStatus.fetchSub( + 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. if (probeStatus >= 4) { // There are other workers remaining. Return here so that only the very last worker will proceed. TURF_TRACE(LeapFrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); @@ -516,7 +520,7 @@ endMigration: // We're the very last worker thread. // Perform the appropriate post-migration step depending on whether the migration succeeded or failed. TURF_ASSERT(probeStatus == 3); - bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point + bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point if (!overflowed) { // The migration succeeded. This is the most likely outcome. Publish the new subtree. m_map.publishTableMigration(this); @@ -528,7 +532,8 @@ endMigration: turf::LockGuard guard(origTable->mutex); SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); if (checkedJob != this) { - TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), uptr(checkedJob)); + TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), + uptr(checkedJob)); } else { TableMigration* migration = TableMigration::create(m_map, m_numSources + 1); // Double the destination table size.