From: Jeff Preshing Date: Mon, 29 Feb 2016 13:59:57 +0000 (-0500) Subject: Rename LeapFrog to Leapfrog X-Git-Url: http://plrg.eecs.uci.edu/git/?p=junction.git;a=commitdiff_plain;h=ebd740423cce85a1d1049f637d4e65b7d8465717;ds=sidebyside Rename LeapFrog to Leapfrog --- diff --git a/README.md b/README.md index 96d4dd2..b4361a0 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ Junction is a library of concurrent data structures in C++. It contains several junction::ConcurrentMap_Crude junction::ConcurrentMap_Linear - junction::ConcurrentMap_LeapFrog + junction::ConcurrentMap_Leapfrog junction::ConcurrentMap_Grampa [CMake](https://cmake.org/) and [Turf](https://github.com/preshing/turf) are required. See the blog post [New Concurrent Hash Maps for C++](http://preshing.com/20160201/new-concurrent-hash-maps-for-cpp/) for more information. @@ -103,7 +103,7 @@ A Junction map is a lot like a big array of `std::atomic<>` variables, where the * All of a Junction map's member functions, together with its `Mutator` member functions, are atomic with respect to each other, so you can safely call them from any thread without mutual exclusion. * If an `set` [happens before](http://preshing.com/20130702/the-happens-before-relation/) a `get` with the same key, the `get` will return the value set, except if another operation changes the value in between. Any [synchronizing operation](http://preshing.com/20130823/the-synchronizes-with-relation/) will establish this relationship. -* For Linear, LeapFrog and Grampa maps, `set` is a [release](http://preshing.com/20120913/acquire-and-release-semantics/) operation and `get` is a [consume](http://preshing.com/20140709/the-purpose-of-memory_order_consume-in-cpp11/) operation, so you can safely pass non-atomic information between threads using a pointer. For Crude maps, all operations are relaxed. +* For Linear, Leapfrog and Grampa maps, `set` is a [release](http://preshing.com/20120913/acquire-and-release-semantics/) operation and `get` is a [consume](http://preshing.com/20140709/the-purpose-of-memory_order_consume-in-cpp11/) operation, so you can safely pass non-atomic information between threads using a pointer. For Crude maps, all operations are relaxed. * In the current version, you must not set while concurrently using an `Iterator`. ## Feedback diff --git a/junction/ConcurrentMap_LeapFrog.cpp b/junction/ConcurrentMap_LeapFrog.cpp deleted file mode 100644 index 67a39ad..0000000 --- a/junction/ConcurrentMap_LeapFrog.cpp +++ /dev/null @@ -1,37 +0,0 @@ -/*------------------------------------------------------------------------ - Junction: Concurrent data structures in C++ - Copyright (c) 2016 Jeff Preshing - - Distributed under the Simplified BSD License. - Original location: https://github.com/preshing/junction - - This software is distributed WITHOUT ANY WARRANTY; without even the - implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - See the LICENSE file for more information. -------------------------------------------------------------------------*/ - -#include - -namespace junction { - -TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_LeapFrog, 17) // autogenerated by TidySource.py -TURF_TRACE_DEFINE("[Mutator] find constructor called") -TURF_TRACE_DEFINE("[Mutator] find was redirected") -TURF_TRACE_DEFINE("[Mutator] insertOrFind constructor called") -TURF_TRACE_DEFINE("[Mutator] insertOrFind was redirected") -TURF_TRACE_DEFINE("[Mutator::exchangeValue] called") -TURF_TRACE_DEFINE("[Mutator::exchangeValue] exchanged Value") -TURF_TRACE_DEFINE("[Mutator::exchangeValue] detected race to write value") -TURF_TRACE_DEFINE("[Mutator::exchangeValue] racing write inserted new value") -TURF_TRACE_DEFINE("[Mutator::exchangeValue] was redirected") -TURF_TRACE_DEFINE("[Mutator::exchangeValue] was re-redirected") -TURF_TRACE_DEFINE("[Mutator::exchangeValue] overflow after redirect") -TURF_TRACE_DEFINE("[Mutator::eraseValue] called") -TURF_TRACE_DEFINE("[Mutator::eraseValue] detected race to write value") -TURF_TRACE_DEFINE("[Mutator::eraseValue] was redirected") -TURF_TRACE_DEFINE("[Mutator::eraseValue] was re-redirected") -TURF_TRACE_DEFINE("[get] called") -TURF_TRACE_DEFINE("[get] was redirected") -TURF_TRACE_DEFINE_END(ConcurrentMap_LeapFrog, 17) - -} // namespace junction diff --git a/junction/ConcurrentMap_LeapFrog.h b/junction/ConcurrentMap_LeapFrog.h deleted file mode 100644 index 2d0c0e2..0000000 --- a/junction/ConcurrentMap_LeapFrog.h +++ /dev/null @@ -1,343 +0,0 @@ -/*------------------------------------------------------------------------ - Junction: Concurrent data structures in C++ - Copyright (c) 2016 Jeff Preshing - - Distributed under the Simplified BSD License. - Original location: https://github.com/preshing/junction - - This software is distributed WITHOUT ANY WARRANTY; without even the - implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - See the LICENSE file for more information. -------------------------------------------------------------------------*/ - -#ifndef JUNCTION_CONCURRENTMAP_LEAPFROG_H -#define JUNCTION_CONCURRENTMAP_LEAPFROG_H - -#include -#include -#include -#include -#include - -namespace junction { - -TURF_TRACE_DECLARE(ConcurrentMap_LeapFrog, 17) - -template , class VT = DefaultValueTraits > -class ConcurrentMap_LeapFrog { -public: - typedef K Key; - typedef V Value; - typedef KT KeyTraits; - typedef VT ValueTraits; - typedef typename turf::util::BestFit::Unsigned Hash; - typedef details::LeapFrog Details; - -private: - turf::Atomic m_root; - -public: - ConcurrentMap_LeapFrog(ureg capacity = Details::InitialSize) : m_root(Details::Table::create(capacity)) { - } - - ~ConcurrentMap_LeapFrog() { - typename Details::Table* table = m_root.loadNonatomic(); - table->destroy(); - } - - // publishTableMigration() is called by exactly one thread from Details::TableMigration::run() - // after all the threads participating in the migration have completed their work. - void publishTableMigration(typename Details::TableMigration* migration) { - // There are no racing calls to this function. - typename Details::Table* oldRoot = m_root.loadNonatomic(); - m_root.store(migration->m_destination, turf::Release); - TURF_ASSERT(oldRoot == migration->getSources()[0].table); - // Caller will GC the TableMigration and the source table. - } - - // A Mutator represents a known cell in the hash table. - // It's meant for manipulations within a temporary function scope. - // Obviously you must not call QSBR::Update while holding a Mutator. - // Any operation that modifies the table (exchangeValue, eraseValue) - // may be forced to follow a redirected cell, which changes the Mutator itself. - // Note that even if the Mutator was constructed from an existing cell, - // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted, - // or if another thread deletes the key between the two steps. - class Mutator { - private: - friend class ConcurrentMap_LeapFrog; - - ConcurrentMap_LeapFrog& m_map; - typename Details::Table* m_table; - typename Details::Cell* m_cell; - Value m_value; - - // Constructor: Find existing cell - Mutator(ConcurrentMap_LeapFrog& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_LeapFrog, 0, "[Mutator] find constructor called", uptr(0), uptr(key)); - Hash hash = KeyTraits::hash(key); - for (;;) { - m_table = m_map.m_root.load(turf::Consume); - m_cell = Details::find(hash, m_table); - if (!m_cell) - return; - m_value = m_cell->value.load(turf::Consume); - if (m_value != Value(ValueTraits::Redirect)) - return; // Found an existing value - // We've encountered a Redirect value. Help finish the migration. - TURF_TRACE(ConcurrentMap_LeapFrog, 1, "[Mutator] find was redirected", uptr(m_table), 0); - m_table->jobCoordinator.participate(); - // Try again using the latest root. - } - } - - // Constructor: Insert or find cell - Mutator(ConcurrentMap_LeapFrog& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_LeapFrog, 2, "[Mutator] insertOrFind constructor called", uptr(0), uptr(key)); - Hash hash = KeyTraits::hash(key); - for (;;) { - m_table = m_map.m_root.load(turf::Consume); - ureg overflowIdx; - switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell - case Details::InsertResult_InsertedNew: { - // We've inserted a new cell. Don't load m_cell->value. - return; - } - case Details::InsertResult_AlreadyFound: { - // The hash was already found in the table. - m_value = m_cell->value.load(turf::Consume); - if (m_value == Value(ValueTraits::Redirect)) { - // We've encountered a Redirect value. - TURF_TRACE(ConcurrentMap_LeapFrog, 3, "[Mutator] insertOrFind was redirected", uptr(m_table), uptr(m_value)); - break; // Help finish the migration. - } - return; // Found an existing value - } - case Details::InsertResult_Overflow: { - Details::beginTableMigration(m_map, m_table, overflowIdx); - break; - } - } - // A migration has been started (either by us, or another thread). Participate until it's complete. - m_table->jobCoordinator.participate(); - // Try again using the latest root. - } - } - - public: - Value getValue() const { - // Return previously loaded value. Don't load it again. - return Value(m_value); - } - - Value exchangeValue(Value desired) { - TURF_ASSERT(desired != Value(ValueTraits::NullValue)); - TURF_ASSERT(desired != Value(ValueTraits::Redirect)); - TURF_ASSERT(m_cell); // Cell must have been found or inserted - TURF_TRACE(ConcurrentMap_LeapFrog, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value)); - for (;;) { - Value oldValue = m_value; - if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) { - // Exchange was successful. Return previous value. - TURF_TRACE(ConcurrentMap_LeapFrog, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), - uptr(desired)); - Value result = m_value; - m_value = desired; // Leave the mutator in a valid state - return result; - } - // The CAS failed and m_value has been updated with the latest value. - if (m_value != Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_LeapFrog, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), - uptr(m_value)); - if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) { - TURF_TRACE(ConcurrentMap_LeapFrog, 7, "[Mutator::exchangeValue] racing write inserted new value", - uptr(m_table), uptr(m_value)); - } - // There was a racing write (or erase) to this cell. - // Pretend we exchanged with ourselves, and just let the racing write win. - return desired; - } - // We've encountered a Redirect value. Help finish the migration. - TURF_TRACE(ConcurrentMap_LeapFrog, 8, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value)); - Hash hash = m_cell->hash.load(turf::Relaxed); - for (;;) { - // Help complete the migration. - m_table->jobCoordinator.participate(); - // Try again in the new table. - m_table = m_map.m_root.load(turf::Consume); - m_value = Value(ValueTraits::NullValue); - ureg overflowIdx; - switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell - case Details::InsertResult_AlreadyFound: - m_value = m_cell->value.load(turf::Consume); - if (m_value == Value(ValueTraits::Redirect)) { - TURF_TRACE(ConcurrentMap_LeapFrog, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), - uptr(m_value)); - break; - } - goto breakOuter; - case Details::InsertResult_InsertedNew: - goto breakOuter; - case Details::InsertResult_Overflow: - TURF_TRACE(ConcurrentMap_LeapFrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), - overflowIdx); - Details::beginTableMigration(m_map, m_table, overflowIdx); - break; - } - // We were redirected... again - } - breakOuter:; - // Try again in the new table. - } - } - - void setValue(Value desired) { - exchangeValue(desired); - } - - Value eraseValue() { - TURF_ASSERT(m_cell); // Cell must have been found or inserted - TURF_TRACE(ConcurrentMap_LeapFrog, 11, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_cell)); - for (;;) { - if (m_value == Value(ValueTraits::NullValue)) - return Value(m_value); - TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted. - if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) { - // Exchange was successful and a non-NULL value was erased and returned by reference in m_value. - TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop. - Value result = m_value; - m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state - return result; - } - // The CAS failed and m_value has been updated with the latest value. - TURF_TRACE(ConcurrentMap_LeapFrog, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table), - uptr(m_cell)); - if (m_value != Value(ValueTraits::Redirect)) { - // There was a racing write (or erase) to this cell. - // Pretend we erased nothing, and just let the racing write win. - return Value(ValueTraits::NullValue); - } - // We've been redirected to a new table. - TURF_TRACE(ConcurrentMap_LeapFrog, 13, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell)); - Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash - for (;;) { - // Help complete the migration. - m_table->jobCoordinator.participate(); - // Try again in the new table. - m_table = m_map.m_root.load(turf::Consume); - m_cell = Details::find(hash, m_table); - if (!m_cell) { - m_value = Value(ValueTraits::NullValue); - return m_value; - } - m_value = m_cell->value.load(turf::Relaxed); - if (m_value != Value(ValueTraits::Redirect)) - break; - TURF_TRACE(ConcurrentMap_LeapFrog, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table), - uptr(m_cell)); - } - } - } - }; - - Mutator insertOrFind(Key key) { - return Mutator(*this, key); - } - - Mutator find(Key key) { - return Mutator(*this, key, false); - } - - // Lookup without creating a temporary Mutator. - Value get(Key key) { - Hash hash = KeyTraits::hash(key); - TURF_TRACE(ConcurrentMap_LeapFrog, 15, "[get] called", uptr(this), uptr(hash)); - for (;;) { - typename Details::Table* table = m_root.load(turf::Consume); - typename Details::Cell* cell = Details::find(hash, table); - if (!cell) - return Value(ValueTraits::NullValue); - Value value = cell->value.load(turf::Consume); - if (value != Value(ValueTraits::Redirect)) - return value; // Found an existing value - // We've been redirected to a new table. Help with the migration. - TURF_TRACE(ConcurrentMap_LeapFrog, 16, "[get] was redirected", uptr(table), uptr(hash)); - table->jobCoordinator.participate(); - // Try again in the new table. - } - } - - Value set(Key key, Value desired) { - Mutator iter(*this, key); - return iter.exchangeValue(desired); - } - - Value exchange(Key key, Value desired) { - Mutator iter(*this, key); - return iter.exchangeValue(desired); - } - - Value erase(Key key) { - Mutator iter(*this, key, false); - return iter.eraseValue(); - } - - // The easiest way to implement an Iterator is to prevent all Redirects. - // The currrent Iterator does that by forbidding concurrent inserts. - // To make it work with concurrent inserts, we'd need a way to block TableMigrations. - class Iterator { - private: - typename Details::Table* m_table; - ureg m_idx; - Key m_hash; - Value m_value; - - public: - Iterator(ConcurrentMap_LeapFrog& map) { - // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead: - m_table = map.m_root.load(turf::Consume); - m_idx = -1; - next(); - } - - void next() { - TURF_ASSERT(m_table); - TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating. - while (++m_idx <= m_table->sizeMask) { - // Index still inside range of table. - typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2); - typename Details::Cell* cell = group->cells + (m_idx & 3); - m_hash = cell->hash.load(turf::Relaxed); - if (m_hash != KeyTraits::NullHash) { - // Cell has been reserved. - m_value = cell->value.load(turf::Relaxed); - TURF_ASSERT(m_value != Value(ValueTraits::Redirect)); - if (m_value != Value(ValueTraits::NullValue)) - return; // Yield this cell. - } - } - // That's the end of the map. - m_hash = KeyTraits::NullHash; - m_value = Value(ValueTraits::NullValue); - } - - bool isValid() const { - return m_value != Value(ValueTraits::NullValue); - } - - Key getKey() const { - TURF_ASSERT(isValid()); - // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead: - return KeyTraits::dehash(m_hash); - } - - Value getValue() const { - TURF_ASSERT(isValid()); - return m_value; - } - }; -}; - -} // namespace junction - -#endif // JUNCTION_CONCURRENTMAP_LEAPFROG_H diff --git a/junction/ConcurrentMap_Leapfrog.cpp b/junction/ConcurrentMap_Leapfrog.cpp new file mode 100644 index 0000000..e4aa4b7 --- /dev/null +++ b/junction/ConcurrentMap_Leapfrog.cpp @@ -0,0 +1,37 @@ +/*------------------------------------------------------------------------ + Junction: Concurrent data structures in C++ + Copyright (c) 2016 Jeff Preshing + + Distributed under the Simplified BSD License. + Original location: https://github.com/preshing/junction + + This software is distributed WITHOUT ANY WARRANTY; without even the + implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the LICENSE file for more information. +------------------------------------------------------------------------*/ + +#include + +namespace junction { + +TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_Leapfrog, 17) // autogenerated by TidySource.py +TURF_TRACE_DEFINE("[Mutator] find constructor called") +TURF_TRACE_DEFINE("[Mutator] find was redirected") +TURF_TRACE_DEFINE("[Mutator] insertOrFind constructor called") +TURF_TRACE_DEFINE("[Mutator] insertOrFind was redirected") +TURF_TRACE_DEFINE("[Mutator::exchangeValue] called") +TURF_TRACE_DEFINE("[Mutator::exchangeValue] exchanged Value") +TURF_TRACE_DEFINE("[Mutator::exchangeValue] detected race to write value") +TURF_TRACE_DEFINE("[Mutator::exchangeValue] racing write inserted new value") +TURF_TRACE_DEFINE("[Mutator::exchangeValue] was redirected") +TURF_TRACE_DEFINE("[Mutator::exchangeValue] was re-redirected") +TURF_TRACE_DEFINE("[Mutator::exchangeValue] overflow after redirect") +TURF_TRACE_DEFINE("[Mutator::eraseValue] called") +TURF_TRACE_DEFINE("[Mutator::eraseValue] detected race to write value") +TURF_TRACE_DEFINE("[Mutator::eraseValue] was redirected") +TURF_TRACE_DEFINE("[Mutator::eraseValue] was re-redirected") +TURF_TRACE_DEFINE("[get] called") +TURF_TRACE_DEFINE("[get] was redirected") +TURF_TRACE_DEFINE_END(ConcurrentMap_Leapfrog, 17) + +} // namespace junction diff --git a/junction/ConcurrentMap_Leapfrog.h b/junction/ConcurrentMap_Leapfrog.h new file mode 100644 index 0000000..89ba120 --- /dev/null +++ b/junction/ConcurrentMap_Leapfrog.h @@ -0,0 +1,343 @@ +/*------------------------------------------------------------------------ + Junction: Concurrent data structures in C++ + Copyright (c) 2016 Jeff Preshing + + Distributed under the Simplified BSD License. + Original location: https://github.com/preshing/junction + + This software is distributed WITHOUT ANY WARRANTY; without even the + implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the LICENSE file for more information. +------------------------------------------------------------------------*/ + +#ifndef JUNCTION_CONCURRENTMAP_LEAPFROG_H +#define JUNCTION_CONCURRENTMAP_LEAPFROG_H + +#include +#include +#include +#include +#include + +namespace junction { + +TURF_TRACE_DECLARE(ConcurrentMap_Leapfrog, 17) + +template , class VT = DefaultValueTraits > +class ConcurrentMap_Leapfrog { +public: + typedef K Key; + typedef V Value; + typedef KT KeyTraits; + typedef VT ValueTraits; + typedef typename turf::util::BestFit::Unsigned Hash; + typedef details::Leapfrog Details; + +private: + turf::Atomic m_root; + +public: + ConcurrentMap_Leapfrog(ureg capacity = Details::InitialSize) : m_root(Details::Table::create(capacity)) { + } + + ~ConcurrentMap_Leapfrog() { + typename Details::Table* table = m_root.loadNonatomic(); + table->destroy(); + } + + // publishTableMigration() is called by exactly one thread from Details::TableMigration::run() + // after all the threads participating in the migration have completed their work. + void publishTableMigration(typename Details::TableMigration* migration) { + // There are no racing calls to this function. + typename Details::Table* oldRoot = m_root.loadNonatomic(); + m_root.store(migration->m_destination, turf::Release); + TURF_ASSERT(oldRoot == migration->getSources()[0].table); + // Caller will GC the TableMigration and the source table. + } + + // A Mutator represents a known cell in the hash table. + // It's meant for manipulations within a temporary function scope. + // Obviously you must not call QSBR::Update while holding a Mutator. + // Any operation that modifies the table (exchangeValue, eraseValue) + // may be forced to follow a redirected cell, which changes the Mutator itself. + // Note that even if the Mutator was constructed from an existing cell, + // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted, + // or if another thread deletes the key between the two steps. + class Mutator { + private: + friend class ConcurrentMap_Leapfrog; + + ConcurrentMap_Leapfrog& m_map; + typename Details::Table* m_table; + typename Details::Cell* m_cell; + Value m_value; + + // Constructor: Find existing cell + Mutator(ConcurrentMap_Leapfrog& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) { + TURF_TRACE(ConcurrentMap_Leapfrog, 0, "[Mutator] find constructor called", uptr(0), uptr(key)); + Hash hash = KeyTraits::hash(key); + for (;;) { + m_table = m_map.m_root.load(turf::Consume); + m_cell = Details::find(hash, m_table); + if (!m_cell) + return; + m_value = m_cell->value.load(turf::Consume); + if (m_value != Value(ValueTraits::Redirect)) + return; // Found an existing value + // We've encountered a Redirect value. Help finish the migration. + TURF_TRACE(ConcurrentMap_Leapfrog, 1, "[Mutator] find was redirected", uptr(m_table), 0); + m_table->jobCoordinator.participate(); + // Try again using the latest root. + } + } + + // Constructor: Insert or find cell + Mutator(ConcurrentMap_Leapfrog& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) { + TURF_TRACE(ConcurrentMap_Leapfrog, 2, "[Mutator] insertOrFind constructor called", uptr(0), uptr(key)); + Hash hash = KeyTraits::hash(key); + for (;;) { + m_table = m_map.m_root.load(turf::Consume); + ureg overflowIdx; + switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell + case Details::InsertResult_InsertedNew: { + // We've inserted a new cell. Don't load m_cell->value. + return; + } + case Details::InsertResult_AlreadyFound: { + // The hash was already found in the table. + m_value = m_cell->value.load(turf::Consume); + if (m_value == Value(ValueTraits::Redirect)) { + // We've encountered a Redirect value. + TURF_TRACE(ConcurrentMap_Leapfrog, 3, "[Mutator] insertOrFind was redirected", uptr(m_table), uptr(m_value)); + break; // Help finish the migration. + } + return; // Found an existing value + } + case Details::InsertResult_Overflow: { + Details::beginTableMigration(m_map, m_table, overflowIdx); + break; + } + } + // A migration has been started (either by us, or another thread). Participate until it's complete. + m_table->jobCoordinator.participate(); + // Try again using the latest root. + } + } + + public: + Value getValue() const { + // Return previously loaded value. Don't load it again. + return Value(m_value); + } + + Value exchangeValue(Value desired) { + TURF_ASSERT(desired != Value(ValueTraits::NullValue)); + TURF_ASSERT(desired != Value(ValueTraits::Redirect)); + TURF_ASSERT(m_cell); // Cell must have been found or inserted + TURF_TRACE(ConcurrentMap_Leapfrog, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value)); + for (;;) { + Value oldValue = m_value; + if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) { + // Exchange was successful. Return previous value. + TURF_TRACE(ConcurrentMap_Leapfrog, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), + uptr(desired)); + Value result = m_value; + m_value = desired; // Leave the mutator in a valid state + return result; + } + // The CAS failed and m_value has been updated with the latest value. + if (m_value != Value(ValueTraits::Redirect)) { + TURF_TRACE(ConcurrentMap_Leapfrog, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), + uptr(m_value)); + if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) { + TURF_TRACE(ConcurrentMap_Leapfrog, 7, "[Mutator::exchangeValue] racing write inserted new value", + uptr(m_table), uptr(m_value)); + } + // There was a racing write (or erase) to this cell. + // Pretend we exchanged with ourselves, and just let the racing write win. + return desired; + } + // We've encountered a Redirect value. Help finish the migration. + TURF_TRACE(ConcurrentMap_Leapfrog, 8, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value)); + Hash hash = m_cell->hash.load(turf::Relaxed); + for (;;) { + // Help complete the migration. + m_table->jobCoordinator.participate(); + // Try again in the new table. + m_table = m_map.m_root.load(turf::Consume); + m_value = Value(ValueTraits::NullValue); + ureg overflowIdx; + switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell + case Details::InsertResult_AlreadyFound: + m_value = m_cell->value.load(turf::Consume); + if (m_value == Value(ValueTraits::Redirect)) { + TURF_TRACE(ConcurrentMap_Leapfrog, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), + uptr(m_value)); + break; + } + goto breakOuter; + case Details::InsertResult_InsertedNew: + goto breakOuter; + case Details::InsertResult_Overflow: + TURF_TRACE(ConcurrentMap_Leapfrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), + overflowIdx); + Details::beginTableMigration(m_map, m_table, overflowIdx); + break; + } + // We were redirected... again + } + breakOuter:; + // Try again in the new table. + } + } + + void setValue(Value desired) { + exchangeValue(desired); + } + + Value eraseValue() { + TURF_ASSERT(m_cell); // Cell must have been found or inserted + TURF_TRACE(ConcurrentMap_Leapfrog, 11, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_cell)); + for (;;) { + if (m_value == Value(ValueTraits::NullValue)) + return Value(m_value); + TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted. + if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) { + // Exchange was successful and a non-NULL value was erased and returned by reference in m_value. + TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop. + Value result = m_value; + m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state + return result; + } + // The CAS failed and m_value has been updated with the latest value. + TURF_TRACE(ConcurrentMap_Leapfrog, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table), + uptr(m_cell)); + if (m_value != Value(ValueTraits::Redirect)) { + // There was a racing write (or erase) to this cell. + // Pretend we erased nothing, and just let the racing write win. + return Value(ValueTraits::NullValue); + } + // We've been redirected to a new table. + TURF_TRACE(ConcurrentMap_Leapfrog, 13, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell)); + Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash + for (;;) { + // Help complete the migration. + m_table->jobCoordinator.participate(); + // Try again in the new table. + m_table = m_map.m_root.load(turf::Consume); + m_cell = Details::find(hash, m_table); + if (!m_cell) { + m_value = Value(ValueTraits::NullValue); + return m_value; + } + m_value = m_cell->value.load(turf::Relaxed); + if (m_value != Value(ValueTraits::Redirect)) + break; + TURF_TRACE(ConcurrentMap_Leapfrog, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table), + uptr(m_cell)); + } + } + } + }; + + Mutator insertOrFind(Key key) { + return Mutator(*this, key); + } + + Mutator find(Key key) { + return Mutator(*this, key, false); + } + + // Lookup without creating a temporary Mutator. + Value get(Key key) { + Hash hash = KeyTraits::hash(key); + TURF_TRACE(ConcurrentMap_Leapfrog, 15, "[get] called", uptr(this), uptr(hash)); + for (;;) { + typename Details::Table* table = m_root.load(turf::Consume); + typename Details::Cell* cell = Details::find(hash, table); + if (!cell) + return Value(ValueTraits::NullValue); + Value value = cell->value.load(turf::Consume); + if (value != Value(ValueTraits::Redirect)) + return value; // Found an existing value + // We've been redirected to a new table. Help with the migration. + TURF_TRACE(ConcurrentMap_Leapfrog, 16, "[get] was redirected", uptr(table), uptr(hash)); + table->jobCoordinator.participate(); + // Try again in the new table. + } + } + + Value set(Key key, Value desired) { + Mutator iter(*this, key); + return iter.exchangeValue(desired); + } + + Value exchange(Key key, Value desired) { + Mutator iter(*this, key); + return iter.exchangeValue(desired); + } + + Value erase(Key key) { + Mutator iter(*this, key, false); + return iter.eraseValue(); + } + + // The easiest way to implement an Iterator is to prevent all Redirects. + // The currrent Iterator does that by forbidding concurrent inserts. + // To make it work with concurrent inserts, we'd need a way to block TableMigrations. + class Iterator { + private: + typename Details::Table* m_table; + ureg m_idx; + Key m_hash; + Value m_value; + + public: + Iterator(ConcurrentMap_Leapfrog& map) { + // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead: + m_table = map.m_root.load(turf::Consume); + m_idx = -1; + next(); + } + + void next() { + TURF_ASSERT(m_table); + TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating. + while (++m_idx <= m_table->sizeMask) { + // Index still inside range of table. + typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2); + typename Details::Cell* cell = group->cells + (m_idx & 3); + m_hash = cell->hash.load(turf::Relaxed); + if (m_hash != KeyTraits::NullHash) { + // Cell has been reserved. + m_value = cell->value.load(turf::Relaxed); + TURF_ASSERT(m_value != Value(ValueTraits::Redirect)); + if (m_value != Value(ValueTraits::NullValue)) + return; // Yield this cell. + } + } + // That's the end of the map. + m_hash = KeyTraits::NullHash; + m_value = Value(ValueTraits::NullValue); + } + + bool isValid() const { + return m_value != Value(ValueTraits::NullValue); + } + + Key getKey() const { + TURF_ASSERT(isValid()); + // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead: + return KeyTraits::dehash(m_hash); + } + + Value getValue() const { + TURF_ASSERT(isValid()); + return m_value; + } + }; +}; + +} // namespace junction + +#endif // JUNCTION_CONCURRENTMAP_LEAPFROG_H diff --git a/junction/details/LeapFrog.cpp b/junction/details/LeapFrog.cpp deleted file mode 100644 index 7cb9452..0000000 --- a/junction/details/LeapFrog.cpp +++ /dev/null @@ -1,57 +0,0 @@ -/*------------------------------------------------------------------------ - Junction: Concurrent data structures in C++ - Copyright (c) 2016 Jeff Preshing - - Distributed under the Simplified BSD License. - Original location: https://github.com/preshing/junction - - This software is distributed WITHOUT ANY WARRANTY; without even the - implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - See the LICENSE file for more information. -------------------------------------------------------------------------*/ - -#include -#include -#include - -namespace junction { -namespace details { - -TURF_TRACE_DEFINE_BEGIN(LeapFrog, 33) // autogenerated by TidySource.py -TURF_TRACE_DEFINE("[find] called") -TURF_TRACE_DEFINE("[find] found existing cell optimistically") -TURF_TRACE_DEFINE("[find] found existing cell") -TURF_TRACE_DEFINE("[insertOrFind] called") -TURF_TRACE_DEFINE("[insertOrFind] reserved first cell") -TURF_TRACE_DEFINE("[insertOrFind] race to reserve first cell") -TURF_TRACE_DEFINE("[insertOrFind] found in first cell") -TURF_TRACE_DEFINE("[insertOrFind] race to read hash") -TURF_TRACE_DEFINE("[insertOrFind] found in probe chain") -TURF_TRACE_DEFINE("[insertOrFind] reserved cell") -TURF_TRACE_DEFINE("[insertOrFind] race to reserve cell") -TURF_TRACE_DEFINE("[insertOrFind] found outside probe chain") -TURF_TRACE_DEFINE("[insertOrFind] found late-arriving cell in same bucket") -TURF_TRACE_DEFINE("[insertOrFind] set link on behalf of late-arriving cell") -TURF_TRACE_DEFINE("[insertOrFind] overflow") -TURF_TRACE_DEFINE("[beginTableMigrationToSize] called") -TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists") -TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)") -TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size") -TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected") -TURF_TRACE_DEFINE("[migrateRange] race to insert key") -TURF_TRACE_DEFINE("[migrateRange] race to insert value") -TURF_TRACE_DEFINE("[migrateRange] race inserted Redirect") -TURF_TRACE_DEFINE("[migrateRange] in-use cell already redirected") -TURF_TRACE_DEFINE("[migrateRange] racing update was erase") -TURF_TRACE_DEFINE("[migrateRange] race to update migrated value") -TURF_TRACE_DEFINE("[TableMigration::run] already ended") -TURF_TRACE_DEFINE("[TableMigration::run] detected end flag set") -TURF_TRACE_DEFINE("[TableMigration::run] destination overflow") -TURF_TRACE_DEFINE("[TableMigration::run] race to set m_overflowed") -TURF_TRACE_DEFINE("[TableMigration::run] out of migration units") -TURF_TRACE_DEFINE("[TableMigration::run] not the last worker") -TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started") -TURF_TRACE_DEFINE_END(LeapFrog, 33) - -} // namespace details -} // namespace junction diff --git a/junction/details/LeapFrog.h b/junction/details/LeapFrog.h deleted file mode 100644 index bc184f9..0000000 --- a/junction/details/LeapFrog.h +++ /dev/null @@ -1,577 +0,0 @@ -/*------------------------------------------------------------------------ - Junction: Concurrent data structures in C++ - Copyright (c) 2016 Jeff Preshing - - Distributed under the Simplified BSD License. - Original location: https://github.com/preshing/junction - - This software is distributed WITHOUT ANY WARRANTY; without even the - implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - See the LICENSE file for more information. -------------------------------------------------------------------------*/ - -#ifndef JUNCTION_DETAILS_LEAPFROG_H -#define JUNCTION_DETAILS_LEAPFROG_H - -#include -#include -#include -#include -#include -#include -#include -#include -#include -#include - -// Enable this to force migration overflows (for test purposes): -#define JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS 0 - -namespace junction { -namespace details { - -TURF_TRACE_DECLARE(LeapFrog, 33) - -template -struct LeapFrog { - typedef typename Map::Hash Hash; - typedef typename Map::Value Value; - typedef typename Map::KeyTraits KeyTraits; - typedef typename Map::ValueTraits ValueTraits; - - static const ureg InitialSize = 8; - static const ureg TableMigrationUnitSize = 32; - static const ureg LinearSearchLimit = 128; - static const ureg CellsInUseSample = LinearSearchLimit; - TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links - TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain - - struct Cell { - turf::Atomic hash; - turf::Atomic value; - }; - - struct CellGroup { - // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain. - // Each cell in the probe chain is located within the table itself. - // "deltas" determines the index of the next cell in the probe chain. - // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket. - // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket. - // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket. - turf::Atomic deltas[8]; - Cell cells[4]; - }; - - struct Table { - const ureg sizeMask; // a power of two minus one - turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) - SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration - - Table(ureg sizeMask) : sizeMask(sizeMask) { - } - - static Table* create(ureg tableSize) { - TURF_ASSERT(turf::util::isPowerOf2(tableSize)); - TURF_ASSERT(tableSize >= 4); - ureg numGroups = tableSize >> 2; - Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups); - new (table) Table(tableSize - 1); - for (ureg i = 0; i < numGroups; i++) { - CellGroup* group = table->getCellGroups() + i; - for (ureg j = 0; j < 4; j++) { - group->deltas[j].storeNonatomic(0); - group->deltas[j + 4].storeNonatomic(0); - group->cells[j].hash.storeNonatomic(KeyTraits::NullHash); - group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue)); - } - } - return table; - } - - void destroy() { - this->Table::~Table(); - TURF_HEAP.free(this); - } - - CellGroup* getCellGroups() const { - return (CellGroup*) (this + 1); - } - - ureg getNumMigrationUnits() const { - return sizeMask / TableMigrationUnitSize + 1; - } - }; - - class TableMigration : public SimpleJobCoordinator::Job { - public: - struct Source { - Table* table; - turf::Atomic sourceIndex; - }; - - Map& m_map; - Table* m_destination; - turf::Atomic m_workerStatus; // number of workers + end flag - turf::Atomic m_overflowed; - turf::Atomic m_unitsRemaining; - ureg m_numSources; - - TableMigration(Map& map) : m_map(map) { - } - - static TableMigration* create(Map& map, ureg numSources) { - TableMigration* migration = - (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources); - new (migration) TableMigration(map); - migration->m_workerStatus.storeNonatomic(0); - migration->m_overflowed.storeNonatomic(false); - migration->m_unitsRemaining.storeNonatomic(0); - migration->m_numSources = numSources; - // Caller is responsible for filling in sources & destination - return migration; - } - - virtual ~TableMigration() TURF_OVERRIDE { - } - - void destroy() { - // Destroy all source tables. - for (ureg i = 0; i < m_numSources; i++) - if (getSources()[i].table) - getSources()[i].table->destroy(); - // Delete the migration object itself. - this->TableMigration::~TableMigration(); - TURF_HEAP.free(this); - } - - Source* getSources() const { - return (Source*) (this + 1); - } - - bool migrateRange(Table* srcTable, ureg startIdx); - virtual void run() TURF_OVERRIDE; - }; - - static Cell* find(Hash hash, Table* table) { - TURF_TRACE(LeapFrog, 0, "[find] called", uptr(table), hash); - TURF_ASSERT(table); - TURF_ASSERT(hash != KeyTraits::NullHash); - ureg sizeMask = table->sizeMask; - // Optimistically check hashed cell even though it might belong to another bucket - ureg idx = hash & sizeMask; - CellGroup* group = table->getCellGroups() + (idx >> 2); - Cell* cell = group->cells + (idx & 3); - Hash probeHash = cell->hash.load(turf::Relaxed); - if (probeHash == hash) { - TURF_TRACE(LeapFrog, 1, "[find] found existing cell optimistically", uptr(table), idx); - return cell; - } else if (probeHash == KeyTraits::NullHash) { - return cell = NULL; - } - // Follow probe chain for our bucket - u8 delta = group->deltas[idx & 3].load(turf::Relaxed); - while (delta) { - idx = (idx + delta) & sizeMask; - group = table->getCellGroups() + (idx >> 2); - cell = group->cells + (idx & 3); - Hash probeHash = cell->hash.load(turf::Relaxed); - // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert, - // but we don't check for it. We just follow the probe chain. - if (probeHash == hash) { - TURF_TRACE(LeapFrog, 2, "[find] found existing cell", uptr(table), idx); - return cell; - } - delta = group->deltas[(idx & 3) + 4].load(turf::Relaxed); - } - // End of probe chain, not found - return NULL; - } - - // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound. - enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow }; - static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) { - TURF_TRACE(LeapFrog, 3, "[insertOrFind] called", uptr(table), hash); - TURF_ASSERT(table); - TURF_ASSERT(hash != KeyTraits::NullHash); - ureg sizeMask = table->sizeMask; - ureg idx = ureg(hash); - - // Check hashed cell first, though it may not even belong to the bucket. - CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); - cell = group->cells + (idx & 3); - Hash probeHash = cell->hash.load(turf::Relaxed); - if (probeHash == KeyTraits::NullHash) { - if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) { - TURF_TRACE(LeapFrog, 4, "[insertOrFind] reserved first cell", uptr(table), idx); - // There are no links to set. We're done. - return InsertResult_InsertedNew; - } else { - TURF_TRACE(LeapFrog, 5, "[insertOrFind] race to reserve first cell", uptr(table), idx); - // Fall through to check if it was the same hash... - } - } - if (probeHash == hash) { - TURF_TRACE(LeapFrog, 6, "[insertOrFind] found in first cell", uptr(table), idx); - return InsertResult_AlreadyFound; - } - - // Follow the link chain for this bucket. - ureg maxIdx = idx + sizeMask; - ureg linkLevel = 0; - turf::Atomic* prevLink; - for (;;) { - followLink: - prevLink = group->deltas + ((idx & 3) + linkLevel); - linkLevel = 4; - u8 probeDelta = prevLink->load(turf::Relaxed); - if (probeDelta) { - idx += probeDelta; - // Check the hash for this cell. - group = table->getCellGroups() + ((idx & sizeMask) >> 2); - cell = group->cells + (idx & 3); - probeHash = cell->hash.load(turf::Relaxed); - if (probeHash == KeyTraits::NullHash) { - // Cell was linked, but hash is not visible yet. - // We could avoid this case (and guarantee it's visible) using acquire & release, but instead, - // just poll until it becomes visible. - TURF_TRACE(LeapFrog, 7, "[insertOrFind] race to read hash", uptr(table), idx); - do { - probeHash = cell->hash.load(turf::Acquire); - } while (probeHash == KeyTraits::NullHash); - } - TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked - if (probeHash == hash) { - TURF_TRACE(LeapFrog, 8, "[insertOrFind] found in probe chain", uptr(table), idx); - return InsertResult_AlreadyFound; - } - } else { - // Reached the end of the link chain for this bucket. - // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket. - ureg prevLinkIdx = idx; - TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range. - ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit); - while (linearProbesRemaining-- > 0) { - idx++; - group = table->getCellGroups() + ((idx & sizeMask) >> 2); - cell = group->cells + (idx & 3); - probeHash = cell->hash.load(turf::Relaxed); - if (probeHash == KeyTraits::NullHash) { - // It's an empty cell. Try to reserve it. - if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) { - // Success. We've reserved the cell. Link it to previous cell in same bucket. - TURF_TRACE(LeapFrog, 9, "[insertOrFind] reserved cell", uptr(table), idx); - TURF_ASSERT(probeDelta == 0); - u8 desiredDelta = idx - prevLinkIdx; -#if TURF_WITH_ASSERTS - probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed); - TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta); -#else - prevLink->store(desiredDelta, turf::Relaxed); -#endif - return InsertResult_InsertedNew; - } else { - TURF_TRACE(LeapFrog, 10, "[insertOrFind] race to reserve cell", uptr(table), idx); - // Fall through to check if it's the same hash... - } - } - Hash x = (probeHash ^ hash); - // Check for same hash. - if (!x) { - TURF_TRACE(LeapFrog, 11, "[insertOrFind] found outside probe chain", uptr(table), idx); - return InsertResult_AlreadyFound; - } - // Check for same bucket. - if ((x & sizeMask) == 0) { - TURF_TRACE(LeapFrog, 12, "[insertOrFind] found late-arriving cell in same bucket", uptr(table), idx); - // Attempt to set the link on behalf of the late-arriving cell. - // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here, - // there's no guarantee that our own link chain will be well-formed by the time this function returns. - // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.) - u8 desiredDelta = idx - prevLinkIdx; -#if TURF_WITH_ASSERTS - probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed); - TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta); - if (probeDelta == 0) - TURF_TRACE(LeapFrog, 13, "[insertOrFind] set link on behalf of late-arriving cell", uptr(table), idx); -#else - prevLink->store(desiredDelta, turf::Relaxed); -#endif - goto followLink; // Try to follow link chain for the bucket again. - } - // Continue linear search... - } - // Table is too full to insert. - overflowIdx = idx + 1; - TURF_TRACE(LeapFrog, 14, "[insertOrFind] overflow", uptr(table), overflowIdx); - return InsertResult_Overflow; - } - } - } - - static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) { - // Create new migration by DCLI. - TURF_TRACE(LeapFrog, 15, "[beginTableMigrationToSize] called", 0, 0); - SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume(); - if (job) { - TURF_TRACE(LeapFrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0); - } else { - turf::LockGuard guard(table->mutex); - job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. - if (job) { - TURF_TRACE(LeapFrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0); - } else { - // Create new migration. - TableMigration* migration = TableMigration::create(map, 1); - migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits()); - migration->getSources()[0].table = table; - migration->getSources()[0].sourceIndex.storeNonatomic(0); - migration->m_destination = Table::create(nextTableSize); - // Publish the new migration. - table->jobCoordinator.storeRelease(migration); - } - } - } - - static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) { - // Estimate number of cells in use based on a small sample. - ureg sizeMask = table->sizeMask; - ureg idx = overflowIdx - CellsInUseSample; - ureg inUseCells = 0; - for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { - CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); - Cell* cell = group->cells + (idx & 3); - Value value = cell->value.load(turf::Relaxed); - if (value == Value(ValueTraits::Redirect)) { - // Another thread kicked off the jobCoordinator. The caller will participate upon return. - TURF_TRACE(LeapFrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0); - return; - } - if (value != Value(ValueTraits::NullValue)) - inUseCells++; - idx++; - } - float inUseRatio = float(inUseCells) / CellsInUseSample; - float estimatedInUse = (sizeMask + 1) * inUseRatio; -#if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS - // Periodically underestimate the number of cells in use. - // This exercises the code that handles overflow during migration. - static ureg counter = 1; - if ((++counter & 3) == 0) { - estimatedInUse /= 4; - } -#endif - ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2))); - beginTableMigrationToSize(map, table, nextTableSize); - } -}; // LeapFrog - -template -bool LeapFrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { - ureg srcSizeMask = srcTable->sizeMask; - ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1); - // Iterate over source range. - for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) { - CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2); - Cell* srcCell = srcGroup->cells + (srcIdx & 3); - Hash srcHash; - Value srcValue; - // Fetch the srcHash and srcValue. - for (;;) { - srcHash = srcCell->hash.load(turf::Relaxed); - if (srcHash == KeyTraits::NullHash) { - // An unused cell. Try to put a Redirect marker in its value. - srcValue = - srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); - if (srcValue == Value(ValueTraits::Redirect)) { - // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(LeapFrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); - break; - } - if (srcValue == Value(ValueTraits::NullValue)) - break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(LeapFrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); - // Otherwise, somebody just claimed the cell. Read srcHash again... - } else { - // Check for deleted/uninitialized value. - srcValue = srcCell->value.load(turf::Relaxed); - if (srcValue == Value(ValueTraits::NullValue)) { - // Try to put a Redirect marker. - if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) - break; // Redirect has been placed. Break inner loop, continue outer loop. - TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); - if (srcValue == Value(ValueTraits::Redirect)) { - // FIXME: I don't think this will happen. Investigate & change to assert - TURF_TRACE(LeapFrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); - break; - } - } else if (srcValue == Value(ValueTraits::Redirect)) { - // srcValue is already marked Redirect due to previous incomplete migration. - TURF_TRACE(LeapFrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); - break; - } - - // We've got a key/value pair to migrate. - // Reserve a destination cell in the destination. - TURF_ASSERT(srcHash != KeyTraits::NullHash); - TURF_ASSERT(srcValue != Value(ValueTraits::NullValue)); - TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); - Cell* dstCell; - ureg overflowIdx; - InsertResult result = insertOrFind(srcHash, m_destination, dstCell, overflowIdx); - // During migration, a hash can only exist in one place among all the source tables, - // and it is only migrated by one thread. Therefore, the hash will never already exist - // in the destination table: - TURF_ASSERT(result != InsertResult_AlreadyFound); - if (result == InsertResult_Overflow) { - // Destination overflow. - // This can happen for several reasons. For example, the source table could have - // existed of all deleted cells when it overflowed, resulting in a small destination - // table size, but then another thread could re-insert all the same hashes - // before the migration completed. - // Caller will cancel the current migration and begin a new one. - return false; - } - // Migrate the old value to the new cell. - for (;;) { - // Copy srcValue to the destination. - dstCell->value.store(srcValue, turf::Relaxed); - // Try to place a Redirect marker in srcValue. - Value doubleCheckedSrcValue = - srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed); - TURF_ASSERT(doubleCheckedSrcValue != - Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. - if (doubleCheckedSrcValue == srcValue) { - // No racing writes to the src. We've successfully placed the Redirect marker. - // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL - // by a late-arriving erase. - if (srcValue == Value(ValueTraits::NullValue)) - TURF_TRACE(LeapFrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); - break; - } - // There was a late-arriving write (or erase) to the src. Migrate the new value and try again. - TURF_TRACE(LeapFrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); - srcValue = doubleCheckedSrcValue; - } - // Cell successfully migrated. Proceed to next source cell. - break; - } - } - } - // Range has been migrated successfully. - return true; -} - -template -void LeapFrog::TableMigration::run() { - // Conditionally increment the shared # of workers. - ureg probeStatus = m_workerStatus.load(turf::Relaxed); - do { - if (probeStatus & 1) { - // End flag is already set, so do nothing. - TURF_TRACE(LeapFrog, 26, "[TableMigration::run] already ended", uptr(this), 0); - return; - } - } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed)); - // # of workers has been incremented, and the end flag is clear. - TURF_ASSERT((probeStatus & 1) == 0); - - // Iterate over all source tables. - for (ureg s = 0; s < m_numSources; s++) { - Source& source = getSources()[s]; - // Loop over all migration units in this source table. - for (;;) { - if (m_workerStatus.load(turf::Relaxed) & 1) { - TURF_TRACE(LeapFrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0); - goto endMigration; - } - ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); - if (startIdx >= source.table->sizeMask + 1) - break; // No more migration units in this table. Try next source table. - bool overflowed = !migrateRange(source.table, startIdx); - if (overflowed) { - // *** FAILED MIGRATION *** - // TableMigration failed due to destination table overflow. - // No other thread can declare the migration successful at this point, because *this* unit will never complete, - // hence m_unitsRemaining won't reach zero. - // However, multiple threads can independently detect a failed migration at the same time. - TURF_TRACE(LeapFrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); - // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before - // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from - // the thread - // that deals with it. - bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed); - if (oldOverflowed) - TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), - uptr(oldOverflowed)); - m_workerStatus.fetchOr(1, turf::Relaxed); - goto endMigration; - } - sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed); - TURF_ASSERT(prevRemaining > 0); - if (prevRemaining == 1) { - // *** SUCCESSFUL MIGRATION *** - // That was the last chunk to migrate. - m_workerStatus.fetchOr(1, turf::Relaxed); - goto endMigration; - } - } - } - TURF_TRACE(LeapFrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0); - -endMigration: - // Decrement the shared # of workers. - probeStatus = m_workerStatus.fetchSub( - 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. - if (probeStatus >= 4) { - // There are other workers remaining. Return here so that only the very last worker will proceed. - TURF_TRACE(LeapFrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); - return; - } - - // We're the very last worker thread. - // Perform the appropriate post-migration step depending on whether the migration succeeded or failed. - TURF_ASSERT(probeStatus == 3); - bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point - if (!overflowed) { - // The migration succeeded. This is the most likely outcome. Publish the new subtree. - m_map.publishTableMigration(this); - // End the jobCoodinator. - getSources()[0].table->jobCoordinator.end(); - } else { - // The migration failed due to the overflow of the destination table. - Table* origTable = getSources()[0].table; - turf::LockGuard guard(origTable->mutex); - SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); - if (checkedJob != this) { - TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), - uptr(checkedJob)); - } else { - TableMigration* migration = TableMigration::create(m_map, m_numSources + 1); - // Double the destination table size. - migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2); - // Transfer source tables to the new migration. - for (ureg i = 0; i < m_numSources; i++) { - migration->getSources()[i].table = getSources()[i].table; - getSources()[i].table = NULL; - migration->getSources()[i].sourceIndex.storeNonatomic(0); - } - migration->getSources()[m_numSources].table = m_destination; - migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0); - // Calculate total number of migration units to move. - ureg unitsRemaining = 0; - for (ureg s = 0; s < migration->m_numSources; s++) - unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits(); - migration->m_unitsRemaining.storeNonatomic(unitsRemaining); - // Publish the new migration. - origTable->jobCoordinator.storeRelease(migration); - } - } - - // We're done with this TableMigration. Queue it for GC. - DefaultQSBR.enqueue(&TableMigration::destroy, this); -} - -} // namespace details -} // namespace junction - -#endif // JUNCTION_DETAILS_LEAPFROG_H diff --git a/junction/details/Leapfrog.cpp b/junction/details/Leapfrog.cpp new file mode 100644 index 0000000..367cd3d --- /dev/null +++ b/junction/details/Leapfrog.cpp @@ -0,0 +1,57 @@ +/*------------------------------------------------------------------------ + Junction: Concurrent data structures in C++ + Copyright (c) 2016 Jeff Preshing + + Distributed under the Simplified BSD License. + Original location: https://github.com/preshing/junction + + This software is distributed WITHOUT ANY WARRANTY; without even the + implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the LICENSE file for more information. +------------------------------------------------------------------------*/ + +#include +#include +#include + +namespace junction { +namespace details { + +TURF_TRACE_DEFINE_BEGIN(Leapfrog, 33) // autogenerated by TidySource.py +TURF_TRACE_DEFINE("[find] called") +TURF_TRACE_DEFINE("[find] found existing cell optimistically") +TURF_TRACE_DEFINE("[find] found existing cell") +TURF_TRACE_DEFINE("[insertOrFind] called") +TURF_TRACE_DEFINE("[insertOrFind] reserved first cell") +TURF_TRACE_DEFINE("[insertOrFind] race to reserve first cell") +TURF_TRACE_DEFINE("[insertOrFind] found in first cell") +TURF_TRACE_DEFINE("[insertOrFind] race to read hash") +TURF_TRACE_DEFINE("[insertOrFind] found in probe chain") +TURF_TRACE_DEFINE("[insertOrFind] reserved cell") +TURF_TRACE_DEFINE("[insertOrFind] race to reserve cell") +TURF_TRACE_DEFINE("[insertOrFind] found outside probe chain") +TURF_TRACE_DEFINE("[insertOrFind] found late-arriving cell in same bucket") +TURF_TRACE_DEFINE("[insertOrFind] set link on behalf of late-arriving cell") +TURF_TRACE_DEFINE("[insertOrFind] overflow") +TURF_TRACE_DEFINE("[beginTableMigrationToSize] called") +TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists") +TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)") +TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size") +TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected") +TURF_TRACE_DEFINE("[migrateRange] race to insert key") +TURF_TRACE_DEFINE("[migrateRange] race to insert value") +TURF_TRACE_DEFINE("[migrateRange] race inserted Redirect") +TURF_TRACE_DEFINE("[migrateRange] in-use cell already redirected") +TURF_TRACE_DEFINE("[migrateRange] racing update was erase") +TURF_TRACE_DEFINE("[migrateRange] race to update migrated value") +TURF_TRACE_DEFINE("[TableMigration::run] already ended") +TURF_TRACE_DEFINE("[TableMigration::run] detected end flag set") +TURF_TRACE_DEFINE("[TableMigration::run] destination overflow") +TURF_TRACE_DEFINE("[TableMigration::run] race to set m_overflowed") +TURF_TRACE_DEFINE("[TableMigration::run] out of migration units") +TURF_TRACE_DEFINE("[TableMigration::run] not the last worker") +TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started") +TURF_TRACE_DEFINE_END(Leapfrog, 33) + +} // namespace details +} // namespace junction diff --git a/junction/details/Leapfrog.h b/junction/details/Leapfrog.h new file mode 100644 index 0000000..33fd5be --- /dev/null +++ b/junction/details/Leapfrog.h @@ -0,0 +1,577 @@ +/*------------------------------------------------------------------------ + Junction: Concurrent data structures in C++ + Copyright (c) 2016 Jeff Preshing + + Distributed under the Simplified BSD License. + Original location: https://github.com/preshing/junction + + This software is distributed WITHOUT ANY WARRANTY; without even the + implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the LICENSE file for more information. +------------------------------------------------------------------------*/ + +#ifndef JUNCTION_DETAILS_LEAPFROG_H +#define JUNCTION_DETAILS_LEAPFROG_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +// Enable this to force migration overflows (for test purposes): +#define JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS 0 + +namespace junction { +namespace details { + +TURF_TRACE_DECLARE(Leapfrog, 33) + +template +struct Leapfrog { + typedef typename Map::Hash Hash; + typedef typename Map::Value Value; + typedef typename Map::KeyTraits KeyTraits; + typedef typename Map::ValueTraits ValueTraits; + + static const ureg InitialSize = 8; + static const ureg TableMigrationUnitSize = 32; + static const ureg LinearSearchLimit = 128; + static const ureg CellsInUseSample = LinearSearchLimit; + TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links + TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain + + struct Cell { + turf::Atomic hash; + turf::Atomic value; + }; + + struct CellGroup { + // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain. + // Each cell in the probe chain is located within the table itself. + // "deltas" determines the index of the next cell in the probe chain. + // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket. + // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket. + // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket. + turf::Atomic deltas[8]; + Cell cells[4]; + }; + + struct Table { + const ureg sizeMask; // a power of two minus one + turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator) + SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration + + Table(ureg sizeMask) : sizeMask(sizeMask) { + } + + static Table* create(ureg tableSize) { + TURF_ASSERT(turf::util::isPowerOf2(tableSize)); + TURF_ASSERT(tableSize >= 4); + ureg numGroups = tableSize >> 2; + Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups); + new (table) Table(tableSize - 1); + for (ureg i = 0; i < numGroups; i++) { + CellGroup* group = table->getCellGroups() + i; + for (ureg j = 0; j < 4; j++) { + group->deltas[j].storeNonatomic(0); + group->deltas[j + 4].storeNonatomic(0); + group->cells[j].hash.storeNonatomic(KeyTraits::NullHash); + group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue)); + } + } + return table; + } + + void destroy() { + this->Table::~Table(); + TURF_HEAP.free(this); + } + + CellGroup* getCellGroups() const { + return (CellGroup*) (this + 1); + } + + ureg getNumMigrationUnits() const { + return sizeMask / TableMigrationUnitSize + 1; + } + }; + + class TableMigration : public SimpleJobCoordinator::Job { + public: + struct Source { + Table* table; + turf::Atomic sourceIndex; + }; + + Map& m_map; + Table* m_destination; + turf::Atomic m_workerStatus; // number of workers + end flag + turf::Atomic m_overflowed; + turf::Atomic m_unitsRemaining; + ureg m_numSources; + + TableMigration(Map& map) : m_map(map) { + } + + static TableMigration* create(Map& map, ureg numSources) { + TableMigration* migration = + (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources); + new (migration) TableMigration(map); + migration->m_workerStatus.storeNonatomic(0); + migration->m_overflowed.storeNonatomic(false); + migration->m_unitsRemaining.storeNonatomic(0); + migration->m_numSources = numSources; + // Caller is responsible for filling in sources & destination + return migration; + } + + virtual ~TableMigration() TURF_OVERRIDE { + } + + void destroy() { + // Destroy all source tables. + for (ureg i = 0; i < m_numSources; i++) + if (getSources()[i].table) + getSources()[i].table->destroy(); + // Delete the migration object itself. + this->TableMigration::~TableMigration(); + TURF_HEAP.free(this); + } + + Source* getSources() const { + return (Source*) (this + 1); + } + + bool migrateRange(Table* srcTable, ureg startIdx); + virtual void run() TURF_OVERRIDE; + }; + + static Cell* find(Hash hash, Table* table) { + TURF_TRACE(Leapfrog, 0, "[find] called", uptr(table), hash); + TURF_ASSERT(table); + TURF_ASSERT(hash != KeyTraits::NullHash); + ureg sizeMask = table->sizeMask; + // Optimistically check hashed cell even though it might belong to another bucket + ureg idx = hash & sizeMask; + CellGroup* group = table->getCellGroups() + (idx >> 2); + Cell* cell = group->cells + (idx & 3); + Hash probeHash = cell->hash.load(turf::Relaxed); + if (probeHash == hash) { + TURF_TRACE(Leapfrog, 1, "[find] found existing cell optimistically", uptr(table), idx); + return cell; + } else if (probeHash == KeyTraits::NullHash) { + return cell = NULL; + } + // Follow probe chain for our bucket + u8 delta = group->deltas[idx & 3].load(turf::Relaxed); + while (delta) { + idx = (idx + delta) & sizeMask; + group = table->getCellGroups() + (idx >> 2); + cell = group->cells + (idx & 3); + Hash probeHash = cell->hash.load(turf::Relaxed); + // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert, + // but we don't check for it. We just follow the probe chain. + if (probeHash == hash) { + TURF_TRACE(Leapfrog, 2, "[find] found existing cell", uptr(table), idx); + return cell; + } + delta = group->deltas[(idx & 3) + 4].load(turf::Relaxed); + } + // End of probe chain, not found + return NULL; + } + + // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound. + enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow }; + static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) { + TURF_TRACE(Leapfrog, 3, "[insertOrFind] called", uptr(table), hash); + TURF_ASSERT(table); + TURF_ASSERT(hash != KeyTraits::NullHash); + ureg sizeMask = table->sizeMask; + ureg idx = ureg(hash); + + // Check hashed cell first, though it may not even belong to the bucket. + CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); + cell = group->cells + (idx & 3); + Hash probeHash = cell->hash.load(turf::Relaxed); + if (probeHash == KeyTraits::NullHash) { + if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) { + TURF_TRACE(Leapfrog, 4, "[insertOrFind] reserved first cell", uptr(table), idx); + // There are no links to set. We're done. + return InsertResult_InsertedNew; + } else { + TURF_TRACE(Leapfrog, 5, "[insertOrFind] race to reserve first cell", uptr(table), idx); + // Fall through to check if it was the same hash... + } + } + if (probeHash == hash) { + TURF_TRACE(Leapfrog, 6, "[insertOrFind] found in first cell", uptr(table), idx); + return InsertResult_AlreadyFound; + } + + // Follow the link chain for this bucket. + ureg maxIdx = idx + sizeMask; + ureg linkLevel = 0; + turf::Atomic* prevLink; + for (;;) { + followLink: + prevLink = group->deltas + ((idx & 3) + linkLevel); + linkLevel = 4; + u8 probeDelta = prevLink->load(turf::Relaxed); + if (probeDelta) { + idx += probeDelta; + // Check the hash for this cell. + group = table->getCellGroups() + ((idx & sizeMask) >> 2); + cell = group->cells + (idx & 3); + probeHash = cell->hash.load(turf::Relaxed); + if (probeHash == KeyTraits::NullHash) { + // Cell was linked, but hash is not visible yet. + // We could avoid this case (and guarantee it's visible) using acquire & release, but instead, + // just poll until it becomes visible. + TURF_TRACE(Leapfrog, 7, "[insertOrFind] race to read hash", uptr(table), idx); + do { + probeHash = cell->hash.load(turf::Acquire); + } while (probeHash == KeyTraits::NullHash); + } + TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked + if (probeHash == hash) { + TURF_TRACE(Leapfrog, 8, "[insertOrFind] found in probe chain", uptr(table), idx); + return InsertResult_AlreadyFound; + } + } else { + // Reached the end of the link chain for this bucket. + // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket. + ureg prevLinkIdx = idx; + TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range. + ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit); + while (linearProbesRemaining-- > 0) { + idx++; + group = table->getCellGroups() + ((idx & sizeMask) >> 2); + cell = group->cells + (idx & 3); + probeHash = cell->hash.load(turf::Relaxed); + if (probeHash == KeyTraits::NullHash) { + // It's an empty cell. Try to reserve it. + if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) { + // Success. We've reserved the cell. Link it to previous cell in same bucket. + TURF_TRACE(Leapfrog, 9, "[insertOrFind] reserved cell", uptr(table), idx); + TURF_ASSERT(probeDelta == 0); + u8 desiredDelta = idx - prevLinkIdx; +#if TURF_WITH_ASSERTS + probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed); + TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta); +#else + prevLink->store(desiredDelta, turf::Relaxed); +#endif + return InsertResult_InsertedNew; + } else { + TURF_TRACE(Leapfrog, 10, "[insertOrFind] race to reserve cell", uptr(table), idx); + // Fall through to check if it's the same hash... + } + } + Hash x = (probeHash ^ hash); + // Check for same hash. + if (!x) { + TURF_TRACE(Leapfrog, 11, "[insertOrFind] found outside probe chain", uptr(table), idx); + return InsertResult_AlreadyFound; + } + // Check for same bucket. + if ((x & sizeMask) == 0) { + TURF_TRACE(Leapfrog, 12, "[insertOrFind] found late-arriving cell in same bucket", uptr(table), idx); + // Attempt to set the link on behalf of the late-arriving cell. + // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here, + // there's no guarantee that our own link chain will be well-formed by the time this function returns. + // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.) + u8 desiredDelta = idx - prevLinkIdx; +#if TURF_WITH_ASSERTS + probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed); + TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta); + if (probeDelta == 0) + TURF_TRACE(Leapfrog, 13, "[insertOrFind] set link on behalf of late-arriving cell", uptr(table), idx); +#else + prevLink->store(desiredDelta, turf::Relaxed); +#endif + goto followLink; // Try to follow link chain for the bucket again. + } + // Continue linear search... + } + // Table is too full to insert. + overflowIdx = idx + 1; + TURF_TRACE(Leapfrog, 14, "[insertOrFind] overflow", uptr(table), overflowIdx); + return InsertResult_Overflow; + } + } + } + + static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) { + // Create new migration by DCLI. + TURF_TRACE(Leapfrog, 15, "[beginTableMigrationToSize] called", 0, 0); + SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume(); + if (job) { + TURF_TRACE(Leapfrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0); + } else { + turf::LockGuard guard(table->mutex); + job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK. + if (job) { + TURF_TRACE(Leapfrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0); + } else { + // Create new migration. + TableMigration* migration = TableMigration::create(map, 1); + migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits()); + migration->getSources()[0].table = table; + migration->getSources()[0].sourceIndex.storeNonatomic(0); + migration->m_destination = Table::create(nextTableSize); + // Publish the new migration. + table->jobCoordinator.storeRelease(migration); + } + } + } + + static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) { + // Estimate number of cells in use based on a small sample. + ureg sizeMask = table->sizeMask; + ureg idx = overflowIdx - CellsInUseSample; + ureg inUseCells = 0; + for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) { + CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2); + Cell* cell = group->cells + (idx & 3); + Value value = cell->value.load(turf::Relaxed); + if (value == Value(ValueTraits::Redirect)) { + // Another thread kicked off the jobCoordinator. The caller will participate upon return. + TURF_TRACE(Leapfrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0); + return; + } + if (value != Value(ValueTraits::NullValue)) + inUseCells++; + idx++; + } + float inUseRatio = float(inUseCells) / CellsInUseSample; + float estimatedInUse = (sizeMask + 1) * inUseRatio; +#if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS + // Periodically underestimate the number of cells in use. + // This exercises the code that handles overflow during migration. + static ureg counter = 1; + if ((++counter & 3) == 0) { + estimatedInUse /= 4; + } +#endif + ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2))); + beginTableMigrationToSize(map, table, nextTableSize); + } +}; // Leapfrog + +template +bool Leapfrog::TableMigration::migrateRange(Table* srcTable, ureg startIdx) { + ureg srcSizeMask = srcTable->sizeMask; + ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1); + // Iterate over source range. + for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) { + CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2); + Cell* srcCell = srcGroup->cells + (srcIdx & 3); + Hash srcHash; + Value srcValue; + // Fetch the srcHash and srcValue. + for (;;) { + srcHash = srcCell->hash.load(turf::Relaxed); + if (srcHash == KeyTraits::NullHash) { + // An unused cell. Try to put a Redirect marker in its value. + srcValue = + srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed); + if (srcValue == Value(ValueTraits::Redirect)) { + // srcValue is already marked Redirect due to previous incomplete migration. + TURF_TRACE(Leapfrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx); + break; + } + if (srcValue == Value(ValueTraits::NullValue)) + break; // Redirect has been placed. Break inner loop, continue outer loop. + TURF_TRACE(Leapfrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx); + // Otherwise, somebody just claimed the cell. Read srcHash again... + } else { + // Check for deleted/uninitialized value. + srcValue = srcCell->value.load(turf::Relaxed); + if (srcValue == Value(ValueTraits::NullValue)) { + // Try to put a Redirect marker. + if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed)) + break; // Redirect has been placed. Break inner loop, continue outer loop. + TURF_TRACE(Leapfrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx); + if (srcValue == Value(ValueTraits::Redirect)) { + // FIXME: I don't think this will happen. Investigate & change to assert + TURF_TRACE(Leapfrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx); + break; + } + } else if (srcValue == Value(ValueTraits::Redirect)) { + // srcValue is already marked Redirect due to previous incomplete migration. + TURF_TRACE(Leapfrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx); + break; + } + + // We've got a key/value pair to migrate. + // Reserve a destination cell in the destination. + TURF_ASSERT(srcHash != KeyTraits::NullHash); + TURF_ASSERT(srcValue != Value(ValueTraits::NullValue)); + TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); + Cell* dstCell; + ureg overflowIdx; + InsertResult result = insertOrFind(srcHash, m_destination, dstCell, overflowIdx); + // During migration, a hash can only exist in one place among all the source tables, + // and it is only migrated by one thread. Therefore, the hash will never already exist + // in the destination table: + TURF_ASSERT(result != InsertResult_AlreadyFound); + if (result == InsertResult_Overflow) { + // Destination overflow. + // This can happen for several reasons. For example, the source table could have + // existed of all deleted cells when it overflowed, resulting in a small destination + // table size, but then another thread could re-insert all the same hashes + // before the migration completed. + // Caller will cancel the current migration and begin a new one. + return false; + } + // Migrate the old value to the new cell. + for (;;) { + // Copy srcValue to the destination. + dstCell->value.store(srcValue, turf::Relaxed); + // Try to place a Redirect marker in srcValue. + Value doubleCheckedSrcValue = + srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed); + TURF_ASSERT(doubleCheckedSrcValue != + Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time. + if (doubleCheckedSrcValue == srcValue) { + // No racing writes to the src. We've successfully placed the Redirect marker. + // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL + // by a late-arriving erase. + if (srcValue == Value(ValueTraits::NullValue)) + TURF_TRACE(Leapfrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx); + break; + } + // There was a late-arriving write (or erase) to the src. Migrate the new value and try again. + TURF_TRACE(Leapfrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx); + srcValue = doubleCheckedSrcValue; + } + // Cell successfully migrated. Proceed to next source cell. + break; + } + } + } + // Range has been migrated successfully. + return true; +} + +template +void Leapfrog::TableMigration::run() { + // Conditionally increment the shared # of workers. + ureg probeStatus = m_workerStatus.load(turf::Relaxed); + do { + if (probeStatus & 1) { + // End flag is already set, so do nothing. + TURF_TRACE(Leapfrog, 26, "[TableMigration::run] already ended", uptr(this), 0); + return; + } + } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed)); + // # of workers has been incremented, and the end flag is clear. + TURF_ASSERT((probeStatus & 1) == 0); + + // Iterate over all source tables. + for (ureg s = 0; s < m_numSources; s++) { + Source& source = getSources()[s]; + // Loop over all migration units in this source table. + for (;;) { + if (m_workerStatus.load(turf::Relaxed) & 1) { + TURF_TRACE(Leapfrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0); + goto endMigration; + } + ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed); + if (startIdx >= source.table->sizeMask + 1) + break; // No more migration units in this table. Try next source table. + bool overflowed = !migrateRange(source.table, startIdx); + if (overflowed) { + // *** FAILED MIGRATION *** + // TableMigration failed due to destination table overflow. + // No other thread can declare the migration successful at this point, because *this* unit will never complete, + // hence m_unitsRemaining won't reach zero. + // However, multiple threads can independently detect a failed migration at the same time. + TURF_TRACE(Leapfrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx)); + // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before + // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from + // the thread + // that deals with it. + bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed); + if (oldOverflowed) + TURF_TRACE(Leapfrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed), + uptr(oldOverflowed)); + m_workerStatus.fetchOr(1, turf::Relaxed); + goto endMigration; + } + sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed); + TURF_ASSERT(prevRemaining > 0); + if (prevRemaining == 1) { + // *** SUCCESSFUL MIGRATION *** + // That was the last chunk to migrate. + m_workerStatus.fetchOr(1, turf::Relaxed); + goto endMigration; + } + } + } + TURF_TRACE(Leapfrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0); + +endMigration: + // Decrement the shared # of workers. + probeStatus = m_workerStatus.fetchSub( + 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread. + if (probeStatus >= 4) { + // There are other workers remaining. Return here so that only the very last worker will proceed. + TURF_TRACE(Leapfrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus)); + return; + } + + // We're the very last worker thread. + // Perform the appropriate post-migration step depending on whether the migration succeeded or failed. + TURF_ASSERT(probeStatus == 3); + bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point + if (!overflowed) { + // The migration succeeded. This is the most likely outcome. Publish the new subtree. + m_map.publishTableMigration(this); + // End the jobCoodinator. + getSources()[0].table->jobCoordinator.end(); + } else { + // The migration failed due to the overflow of the destination table. + Table* origTable = getSources()[0].table; + turf::LockGuard guard(origTable->mutex); + SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume(); + if (checkedJob != this) { + TURF_TRACE(Leapfrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable), + uptr(checkedJob)); + } else { + TableMigration* migration = TableMigration::create(m_map, m_numSources + 1); + // Double the destination table size. + migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2); + // Transfer source tables to the new migration. + for (ureg i = 0; i < m_numSources; i++) { + migration->getSources()[i].table = getSources()[i].table; + getSources()[i].table = NULL; + migration->getSources()[i].sourceIndex.storeNonatomic(0); + } + migration->getSources()[m_numSources].table = m_destination; + migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0); + // Calculate total number of migration units to move. + ureg unitsRemaining = 0; + for (ureg s = 0; s < migration->m_numSources; s++) + unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits(); + migration->m_unitsRemaining.storeNonatomic(unitsRemaining); + // Publish the new migration. + origTable->jobCoordinator.storeRelease(migration); + } + } + + // We're done with this TableMigration. Queue it for GC. + DefaultQSBR.enqueue(&TableMigration::destroy, this); +} + +} // namespace details +} // namespace junction + +#endif // JUNCTION_DETAILS_LEAPFROG_H diff --git a/junction/extra/impl/MapAdapter_LeapFrog.h b/junction/extra/impl/MapAdapter_LeapFrog.h deleted file mode 100644 index c277044..0000000 --- a/junction/extra/impl/MapAdapter_LeapFrog.h +++ /dev/null @@ -1,62 +0,0 @@ -/*------------------------------------------------------------------------ - Junction: Concurrent data structures in C++ - Copyright (c) 2016 Jeff Preshing - - Distributed under the Simplified BSD License. - Original location: https://github.com/preshing/junction - - This software is distributed WITHOUT ANY WARRANTY; without even the - implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. - See the LICENSE file for more information. -------------------------------------------------------------------------*/ - -#ifndef JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H -#define JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H - -#include -#include -#include -#include - -namespace junction { -namespace extra { - -class MapAdapter { -public: - static TURF_CONSTEXPR const char* MapName = "Junction LeapFrog map"; - - MapAdapter(ureg) { - } - - class ThreadContext { - private: - QSBR::Context m_qsbrContext; - - public: - ThreadContext(MapAdapter&, ureg) { - } - - void registerThread() { - m_qsbrContext = DefaultQSBR.createContext(); - } - - void unregisterThread() { - DefaultQSBR.destroyContext(m_qsbrContext); - } - - void update() { - DefaultQSBR.update(m_qsbrContext); - } - }; - - typedef ConcurrentMap_LeapFrog Map; - - static ureg getInitialCapacity(ureg maxPopulation) { - return turf::util::roundUpPowerOf2(maxPopulation / 4); - } -}; - -} // namespace extra -} // namespace junction - -#endif // JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H diff --git a/junction/extra/impl/MapAdapter_Leapfrog.h b/junction/extra/impl/MapAdapter_Leapfrog.h new file mode 100644 index 0000000..7661725 --- /dev/null +++ b/junction/extra/impl/MapAdapter_Leapfrog.h @@ -0,0 +1,62 @@ +/*------------------------------------------------------------------------ + Junction: Concurrent data structures in C++ + Copyright (c) 2016 Jeff Preshing + + Distributed under the Simplified BSD License. + Original location: https://github.com/preshing/junction + + This software is distributed WITHOUT ANY WARRANTY; without even the + implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. + See the LICENSE file for more information. +------------------------------------------------------------------------*/ + +#ifndef JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H +#define JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H + +#include +#include +#include +#include + +namespace junction { +namespace extra { + +class MapAdapter { +public: + static TURF_CONSTEXPR const char* MapName = "Junction Leapfrog map"; + + MapAdapter(ureg) { + } + + class ThreadContext { + private: + QSBR::Context m_qsbrContext; + + public: + ThreadContext(MapAdapter&, ureg) { + } + + void registerThread() { + m_qsbrContext = DefaultQSBR.createContext(); + } + + void unregisterThread() { + DefaultQSBR.destroyContext(m_qsbrContext); + } + + void update() { + DefaultQSBR.update(m_qsbrContext); + } + }; + + typedef ConcurrentMap_Leapfrog Map; + + static ureg getInitialCapacity(ureg maxPopulation) { + return turf::util::roundUpPowerOf2(maxPopulation / 4); + } +}; + +} // namespace extra +} // namespace junction + +#endif // JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H diff --git a/samples/MapMemoryBench/TestAllMaps.py b/samples/MapMemoryBench/TestAllMaps.py index 1dfc5fa..f84ef12 100644 --- a/samples/MapMemoryBench/TestAllMaps.py +++ b/samples/MapMemoryBench/TestAllMaps.py @@ -10,7 +10,7 @@ CMAKE = os.getenv('CMAKE', 'cmake') ALL_MAPS = [ ('michael', 'junction/extra/impl/MapAdapter_CDS_Michael.h', ['-DJUNCTION_WITH_CDS=1', '-DTURF_WITH_EXCEPTIONS=1']), ('linear', 'junction/extra/impl/MapAdapter_Linear.h', []), - ('leapfrog', 'junction/extra/impl/MapAdapter_LeapFrog.h', []), + ('leapfrog', 'junction/extra/impl/MapAdapter_Leapfrog.h', []), ('grampa', 'junction/extra/impl/MapAdapter_Grampa.h', []), ('stdmap', 'junction/extra/impl/MapAdapter_StdMap.h', []), ('folly', 'junction/extra/impl/MapAdapter_Folly.h', ['-DJUNCTION_WITH_FOLLY=1', '-DTURF_WITH_EXCEPTIONS=1']), diff --git a/samples/MapPerformanceTests/TestAllMaps.py b/samples/MapPerformanceTests/TestAllMaps.py index 1e76984..792e196 100644 --- a/samples/MapPerformanceTests/TestAllMaps.py +++ b/samples/MapPerformanceTests/TestAllMaps.py @@ -11,7 +11,7 @@ ALL_MAPS = [ ('null', 'junction/extra/impl/MapAdapter_Null.h', [], ['-i256', '-c10']), ('michael', 'junction/extra/impl/MapAdapter_CDS_Michael.h', ['-DJUNCTION_WITH_CDS=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i256', '-c10']), ('linear', 'junction/extra/impl/MapAdapter_Linear.h', [], ['-i256', '-c10']), - ('leapfrog', 'junction/extra/impl/MapAdapter_LeapFrog.h', [], ['-i256', '-c10']), + ('leapfrog', 'junction/extra/impl/MapAdapter_Leapfrog.h', [], ['-i256', '-c10']), ('grampa', 'junction/extra/impl/MapAdapter_Grampa.h', [], ['-i256', '-c10']), ('stdmap', 'junction/extra/impl/MapAdapter_StdMap.h', [], ['-i256', '-c10']), ('folly', 'junction/extra/impl/MapAdapter_Folly.h', ['-DJUNCTION_WITH_FOLLY=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i256', '-c1']), diff --git a/samples/MapScalabilityTests/TestAllMaps.py b/samples/MapScalabilityTests/TestAllMaps.py index bc9cbd5..83359b6 100644 --- a/samples/MapScalabilityTests/TestAllMaps.py +++ b/samples/MapScalabilityTests/TestAllMaps.py @@ -10,7 +10,7 @@ CMAKE = os.getenv('CMAKE', 'cmake') ALL_MAPS = [ ('michael', 'junction/extra/impl/MapAdapter_CDS_Michael.h', ['-DJUNCTION_WITH_CDS=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i10000', '-c200']), ('linear', 'junction/extra/impl/MapAdapter_Linear.h', [], ['-i10000', '-c200']), - ('leapfrog', 'junction/extra/impl/MapAdapter_LeapFrog.h', [], ['-i10000', '-c200']), + ('leapfrog', 'junction/extra/impl/MapAdapter_Leapfrog.h', [], ['-i10000', '-c200']), ('grampa', 'junction/extra/impl/MapAdapter_Grampa.h', [], ['-i10000', '-c200']), ('stdmap', 'junction/extra/impl/MapAdapter_StdMap.h', [], ['-i10000', '-c10']), ('folly', 'junction/extra/impl/MapAdapter_Folly.h', ['-DJUNCTION_WITH_FOLLY=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i2000', '-c1']),