X-Git-Url: http://plrg.eecs.uci.edu/git/?p=junction.git;a=blobdiff_plain;f=junction%2FConcurrentMap_Grampa.h;h=8d67f2219650c9a4b5603617489f3d95b46fc94d;hp=d7fa979ef699d5491acc6a7826d04bef15cf0c65;hb=6a7fe65045a50f9830aa674fcacf16f4e580eaa2;hpb=673a618f0fa0da41cf25897d40edbb0293b4765d diff --git a/junction/ConcurrentMap_Grampa.h b/junction/ConcurrentMap_Grampa.h index d7fa979..8d67f22 100644 --- a/junction/ConcurrentMap_Grampa.h +++ b/junction/ConcurrentMap_Grampa.h @@ -23,7 +23,7 @@ namespace junction { TURF_TRACE_DECLARE(ConcurrentMap_Grampa, 27) -template , class VT = DefaultValueTraits> +template , class VT = DefaultValueTraits > class ConcurrentMap_Grampa { public: typedef K Key; @@ -41,7 +41,7 @@ private: if (root & 1) { typename Details::FlatTree* flatTree = (typename Details::FlatTree*) (root & ~ureg(1)); for (;;) { - ureg leafIdx = (hash >> flatTree->safeShift); + ureg leafIdx = ureg(hash >> flatTree->safeShift); table = flatTree->getTables()[leafIdx].load(turf::Relaxed); if (ureg(table) != Details::RedirectFlatTree) { sizeMask = (Details::LeafSize - 1); @@ -109,18 +109,20 @@ public: // This TableMigration replaces the entire map with a single table. TURF_ASSERT(migration->m_baseHash == 0); TURF_ASSERT(migration->m_numDestinations == 1); - ureg oldRoot = m_root.loadNonatomic(); // There are no racing writes to m_root. + ureg oldRoot = m_root.loadNonatomic(); // There are no racing writes to m_root. // Store the single table in m_root directly. typename Details::Table* newTable = migration->getDestinations()[0]; - m_root.store(uptr(newTable), turf::Release); // Make table contents visible + m_root.store(uptr(newTable), turf::Release); // Make table contents visible newTable->isPublished.signal(); if ((oldRoot & 1) == 0) { - TURF_TRACE(ConcurrentMap_Grampa, 3, "[publishTableMigration] replacing single root with single root", uptr(migration), 0); + TURF_TRACE(ConcurrentMap_Grampa, 3, "[publishTableMigration] replacing single root with single root", + uptr(migration), 0); // If oldRoot is a table, it must be the original source of the migration. TURF_ASSERT((typename Details::Table*) oldRoot == migration->getSources()[0].table); // Don't GC it here. The caller will GC it since it's a source of the TableMigration. } else { - TURF_TRACE(ConcurrentMap_Grampa, 4, "[publishTableMigration] replacing flattree with single root", uptr(migration), 0); + TURF_TRACE(ConcurrentMap_Grampa, 4, "[publishTableMigration] replacing flattree with single root", + uptr(migration), 0); // The entire previous flattree is being replaced. Details::garbageCollectFlatTree((typename Details::FlatTree*) (oldRoot & ~ureg(1))); } @@ -128,7 +130,8 @@ public: } else { // We are either publishing a subtree of one or more tables, or replacing the entire map with multiple tables. // In either case, there will be a flattree after this function returns. - TURF_ASSERT(migration->m_safeShift < sizeof(Hash) * 8); // If m_numDestinations > 1, some index bits must remain after shifting + TURF_ASSERT(migration->m_safeShift < + sizeof(Hash) * 8); // If m_numDestinations > 1, some index bits must remain after shifting ureg oldRoot = m_root.load(turf::Consume); if ((oldRoot & 1) == 0) { // There's no flattree yet. This means the TableMigration is publishing the full range of hashes. @@ -138,7 +141,8 @@ public: TURF_ASSERT((typename Details::Table*) oldRoot == migration->getSources()[0].table); // Furthermore, it is guaranteed that there are no racing writes to m_root. // Create a new flattree and store it to m_root. - TURF_TRACE(ConcurrentMap_Grampa, 5, "[publishTableMigration] replacing single root with flattree", uptr(migration), 0); + TURF_TRACE(ConcurrentMap_Grampa, 5, "[publishTableMigration] replacing single root with flattree", + uptr(migration), 0); typename Details::FlatTree* flatTree = Details::FlatTree::create(migration->m_safeShift); typename Details::Table* prevTable = NULL; for (ureg i = 0; i < migration->m_numDestinations; i++) { @@ -149,14 +153,15 @@ public: prevTable = newTable; } } - m_root.store(uptr(flatTree) | 1, turf::Release); // Ensure visibility of flatTree->tables + m_root.store(uptr(flatTree) | 1, turf::Release); // Ensure visibility of flatTree->tables // Caller will GC the TableMigration. // Caller will also GC the old oldRoot since it's a source of the TableMigration. } else { // There is an existing flattree, and we are publishing one or more tables to it. // Attempt to publish the subtree in a loop. // The loop is necessary because we might get redirected in the middle of publishing. - TURF_TRACE(ConcurrentMap_Grampa, 6, "[publishTableMigration] publishing subtree to existing flattree", uptr(migration), 0); + TURF_TRACE(ConcurrentMap_Grampa, 6, "[publishTableMigration] publishing subtree to existing flattree", + uptr(migration), 0); typename Details::FlatTree* flatTree = (typename Details::FlatTree*) (oldRoot & ~ureg(1)); ureg subTreeEntriesPublished = 0; typename Details::Table* tableToReplace = migration->getSources()[0].table; @@ -164,7 +169,7 @@ public: // Otherwise, there will be a race between a subtree and its own children. // (If all ManualResetEvent objects supported isPublished(), we could add a TURF_TRACE counter for this. // In previous tests, such a counter does in fact get hit.) - tableToReplace->isPublished.wait(); + tableToReplace->isPublished.wait(); typename Details::Table* prevTable = NULL; for (;;) { publishLoop: @@ -173,23 +178,27 @@ public: // First, try to create a FlatTreeMigration with the necessary properties. // This will fail if an existing FlatTreeMigration has already been created using the same source. // In that case, we'll help complete the existing FlatTreeMigration, then we'll retry the loop. - TURF_TRACE(ConcurrentMap_Grampa, 7, "[publishTableMigration] existing flattree too small", uptr(migration), 0); - typename Details::FlatTreeMigration* flatTreeMigration = Details::createFlatTreeMigration(*this, flatTree, migration->m_safeShift); + TURF_TRACE(ConcurrentMap_Grampa, 7, "[publishTableMigration] existing flattree too small", + uptr(migration), 0); + typename Details::FlatTreeMigration* flatTreeMigration = + Details::createFlatTreeMigration(*this, flatTree, migration->m_safeShift); tableToReplace->jobCoordinator.runOne(flatTreeMigration); - flatTreeMigration->m_completed.wait(); // flatTreeMigration->m_destination becomes entirely visible + flatTreeMigration->m_completed.wait(); // flatTreeMigration->m_destination becomes entirely visible flatTree = flatTreeMigration->m_destination; // The FlatTreeMigration has already been GC'ed by the last worker. // Retry the loop. } else { ureg repeat = ureg(1) << (migration->m_safeShift - flatTree->safeShift); - ureg dstStartIndex = migration->m_baseHash >> flatTree->safeShift; + ureg dstStartIndex = ureg(migration->m_baseHash >> flatTree->safeShift); // The subtree we're about to publish fits inside the flattree. TURF_ASSERT(dstStartIndex + migration->m_numDestinations * repeat - 1 <= Hash(-1) >> flatTree->safeShift); // If a previous attempt to publish got redirected, resume publishing into the new flattree, - // starting with the first subtree entry that has not yet been fully published, as given by subTreeEntriesPublished. + // starting with the first subtree entry that has not yet been fully published, as given by + // subTreeEntriesPublished. // (Note: We could, in fact, restart the publish operation starting at entry 0. That would be valid too. // We are the only thread that can modify this particular range of the flattree at this time.) - turf::Atomic* dstLeaf = flatTree->getTables() + dstStartIndex + (subTreeEntriesPublished * repeat); + turf::Atomic* dstLeaf = + flatTree->getTables() + dstStartIndex + (subTreeEntriesPublished * repeat); typename Details::Table** subFlatTree = migration->getDestinations(); while (subTreeEntriesPublished < migration->m_numDestinations) { typename Details::Table* srcTable = subFlatTree[subTreeEntriesPublished]; @@ -199,15 +208,20 @@ public: if (ureg(probeTable) == Details::RedirectFlatTree) { // We've been redirected. // Help with the FlatTreeMigration, then try again. - TURF_TRACE(ConcurrentMap_Grampa, 8, "[publishTableMigration] redirected", uptr(migration), uptr(dstLeaf)); - typename Details::FlatTreeMigration* flatTreeMigration = Details::getExistingFlatTreeMigration(flatTree); + TURF_TRACE(ConcurrentMap_Grampa, 8, "[publishTableMigration] redirected", uptr(migration), + uptr(dstLeaf)); + typename Details::FlatTreeMigration* flatTreeMigration = + Details::getExistingFlatTreeMigration(flatTree); tableToReplace->jobCoordinator.runOne(flatTreeMigration); - flatTreeMigration->m_completed.wait(); // flatTreeMigration->m_destination becomes entirely visible + flatTreeMigration->m_completed.wait(); + // flatTreeMigration->m_destination becomes entirely visible flatTree = flatTreeMigration->m_destination; goto publishLoop; } - // The only other possibility is that we were previously redirected, and the subtree entry got partially published. - TURF_TRACE(ConcurrentMap_Grampa, 9, "[publishTableMigration] recovering from partial publish", uptr(migration), 0); + // The only other possibility is that we were previously redirected, and the subtree entry got + // partially published. + TURF_TRACE(ConcurrentMap_Grampa, 9, "[publishTableMigration] recovering from partial publish", + uptr(migration), 0); TURF_ASSERT(probeTable == srcTable); } // The caller will GC the table) being replaced them since it's a source of the TableMigration. @@ -257,7 +271,8 @@ public: // Constructor: Find existing cell Mutator(ConcurrentMap_Grampa& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_Grampa, 10, "[Mutator] find constructor called", uptr(map.m_root.load(turf::Relaxed)), uptr(key)); + TURF_TRACE(ConcurrentMap_Grampa, 10, "[Mutator] find constructor called", uptr(map.m_root.load(turf::Relaxed)), + uptr(key)); Hash hash = KeyTraits::hash(key); for (;;) { if (!m_map.locateTable(m_table, m_sizeMask, hash)) @@ -267,7 +282,7 @@ public: return; m_value = m_cell->value.load(turf::Consume); if (m_value != Value(ValueTraits::Redirect)) - return; // Found an existing value + return; // Found an existing value // We've encountered a Redirect value. Help finish the migration. TURF_TRACE(ConcurrentMap_Grampa, 11, "[Mutator] find was redirected", uptr(m_table), 0); m_table->jobCoordinator.participate(); @@ -277,32 +292,33 @@ public: // Constructor: Insert cell Mutator(ConcurrentMap_Grampa& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_Grampa, 12, "[Mutator] insert constructor called", uptr(map.m_root.load(turf::Relaxed)), uptr(key)); + TURF_TRACE(ConcurrentMap_Grampa, 12, "[Mutator] insert constructor called", uptr(map.m_root.load(turf::Relaxed)), + uptr(key)); Hash hash = KeyTraits::hash(key); for (;;) { if (!m_map.locateTable(m_table, m_sizeMask, hash)) { m_map.createInitialTable(Details::MinTableSize); } else { ureg overflowIdx; - switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell - case Details::InsertResult_InsertedNew: { - // We've inserted a new cell. Don't load m_cell->value. - return; - } - case Details::InsertResult_AlreadyFound: { - // The hash was already found in the table. - m_value = m_cell->value.load(turf::Consume); - if (m_value == Value(ValueTraits::Redirect)) { - // We've encountered a Redirect value. - TURF_TRACE(ConcurrentMap_Grampa, 13, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value)); - break; // Help finish the migration. - } - return; // Found an existing value - } - case Details::InsertResult_Overflow: { - Details::beginTableMigration(m_map, m_table, overflowIdx); - break; + switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell + case Details::InsertResult_InsertedNew: { + // We've inserted a new cell. Don't load m_cell->value. + return; + } + case Details::InsertResult_AlreadyFound: { + // The hash was already found in the table. + m_value = m_cell->value.load(turf::Consume); + if (m_value == Value(ValueTraits::Redirect)) { + // We've encountered a Redirect value. + TURF_TRACE(ConcurrentMap_Grampa, 13, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value)); + break; // Help finish the migration. } + return; // Found an existing value + } + case Details::InsertResult_Overflow: { + Details::beginTableMigration(m_map, m_table, overflowIdx); + break; + } } // A migration has been started (either by us, or another thread). Participate until it's complete. m_table->jobCoordinator.participate(); @@ -319,22 +335,25 @@ public: Value exchangeValue(Value desired) { TURF_ASSERT(desired != Value(ValueTraits::NullValue)); - TURF_ASSERT(m_cell); // Cell must have been found or inserted + TURF_ASSERT(m_cell); // Cell must have been found or inserted TURF_TRACE(ConcurrentMap_Grampa, 14, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value)); for (;;) { Value oldValue = m_value; if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) { // Exchange was successful. Return previous value. - TURF_TRACE(ConcurrentMap_Grampa, 15, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired)); + TURF_TRACE(ConcurrentMap_Grampa, 15, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), + uptr(desired)); Value result = m_value; - m_value = desired; // Leave the mutator in a valid state + m_value = desired; // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. if (m_value != Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_Grampa, 16, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_Grampa, 16, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), + uptr(m_value)); if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_Grampa, 17, "[Mutator::exchangeValue] racing write inserted new value", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_Grampa, 17, "[Mutator::exchangeValue] racing write inserted new value", + uptr(m_table), uptr(m_value)); } // There was a racing write (or erase) to this cell. // Pretend we exchanged with ourselves, and just let the racing write win. @@ -354,25 +373,26 @@ public: TURF_UNUSED(exists); m_value = Value(ValueTraits::NullValue); ureg overflowIdx; - switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell + switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell case Details::InsertResult_AlreadyFound: m_value = m_cell->value.load(turf::Consume); if (m_value == Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_Grampa, 19, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_Grampa, 19, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), + uptr(m_value)); break; } goto breakOuter; case Details::InsertResult_InsertedNew: goto breakOuter; case Details::InsertResult_Overflow: - TURF_TRACE(ConcurrentMap_Grampa, 20, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), overflowIdx); + TURF_TRACE(ConcurrentMap_Grampa, 20, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), + overflowIdx); Details::beginTableMigration(m_map, m_table, overflowIdx); break; } // We were redirected... again } - breakOuter: - ; + breakOuter:; // Try again in the new table. } } @@ -382,21 +402,22 @@ public: } Value eraseValue() { - TURF_ASSERT(m_cell); // Cell must have been found or inserted + TURF_ASSERT(m_cell); // Cell must have been found or inserted TURF_TRACE(ConcurrentMap_Grampa, 21, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_value)); for (;;) { if (m_value == Value(ValueTraits::NullValue)) return m_value; - TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted. + TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted. if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) { // Exchange was successful and a non-NullValue value was erased and returned by reference in m_value. - TURF_ASSERT(m_value != Value(ValueTraits::NullValue)); // Implied by the test at the start of the loop. + TURF_ASSERT(m_value != Value(ValueTraits::NullValue)); // Implied by the test at the start of the loop. Value result = m_value; - m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state + m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state return result; } // The CAS failed and m_value has been updated with the latest value. - TURF_TRACE(ConcurrentMap_Grampa, 22, "[Mutator::eraseValue] detected race to write value", uptr(m_table), uptr(m_value)); + TURF_TRACE(ConcurrentMap_Grampa, 22, "[Mutator::eraseValue] detected race to write value", uptr(m_table), + uptr(m_value)); if (m_value != Value(ValueTraits::Redirect)) { // There was a racing write (or erase) to this cell. // Pretend we erased nothing, and just let the racing write win. @@ -404,7 +425,7 @@ public: } // We've been redirected to a new table. TURF_TRACE(ConcurrentMap_Grampa, 23, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell)); - Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash + Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash for (;;) { // Help complete the migration. m_table->jobCoordinator.participate(); @@ -448,7 +469,7 @@ public: return Value(ValueTraits::NullValue); Value value = cell->value.load(turf::Consume); if (value != Value(ValueTraits::Redirect)) - return value; // Found an existing value + return value; // Found an existing value // We've been redirected to a new table. Help with the migration. TURF_TRACE(ConcurrentMap_Grampa, 26, "[get] was redirected", uptr(table), 0); table->jobCoordinator.participate(); @@ -511,21 +532,21 @@ public: void next() { TURF_ASSERT(m_table); - TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating. + TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating. for (;;) { searchInTable: m_idx++; if (m_idx <= m_table->sizeMask) { // Index still inside range of table. typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2); - typename Details::Cell *cell = group->cells + (m_idx & 3); + typename Details::Cell* cell = group->cells + (m_idx & 3); m_hash = cell->hash.load(turf::Relaxed); if (m_hash != KeyTraits::NullHash) { // Cell has been reserved. m_value = cell->value.load(turf::Relaxed); TURF_ASSERT(m_value != Value(ValueTraits::Redirect)); if (m_value != Value(ValueTraits::NullValue)) - return; // Yield this cell. + return; // Yield this cell. } } else { // We've advanced past the end of this table. @@ -537,7 +558,7 @@ public: // Found the next table. m_table = nextTable; m_idx = -1; - goto searchInTable; // Continue iterating in this table. + goto searchInTable; // Continue iterating in this table. } } }