junction::ConcurrentMap_Crude
junction::ConcurrentMap_Linear
- junction::ConcurrentMap_LeapFrog
+ junction::ConcurrentMap_Leapfrog
junction::ConcurrentMap_Grampa
[CMake](https://cmake.org/) and [Turf](https://github.com/preshing/turf) are required. See the blog post [New Concurrent Hash Maps for C++](http://preshing.com/20160201/new-concurrent-hash-maps-for-cpp/) for more information.
* All of a Junction map's member functions, together with its `Mutator` member functions, are atomic with respect to each other, so you can safely call them from any thread without mutual exclusion.
* If an `set` [happens before](http://preshing.com/20130702/the-happens-before-relation/) a `get` with the same key, the `get` will return the value set, except if another operation changes the value in between. Any [synchronizing operation](http://preshing.com/20130823/the-synchronizes-with-relation/) will establish this relationship.
-* For Linear, LeapFrog and Grampa maps, `set` is a [release](http://preshing.com/20120913/acquire-and-release-semantics/) operation and `get` is a [consume](http://preshing.com/20140709/the-purpose-of-memory_order_consume-in-cpp11/) operation, so you can safely pass non-atomic information between threads using a pointer. For Crude maps, all operations are relaxed.
+* For Linear, Leapfrog and Grampa maps, `set` is a [release](http://preshing.com/20120913/acquire-and-release-semantics/) operation and `get` is a [consume](http://preshing.com/20140709/the-purpose-of-memory_order_consume-in-cpp11/) operation, so you can safely pass non-atomic information between threads using a pointer. For Crude maps, all operations are relaxed.
* In the current version, you must not set while concurrently using an `Iterator`.
## Feedback
+++ /dev/null
-/*------------------------------------------------------------------------
- Junction: Concurrent data structures in C++
- Copyright (c) 2016 Jeff Preshing
-
- Distributed under the Simplified BSD License.
- Original location: https://github.com/preshing/junction
-
- This software is distributed WITHOUT ANY WARRANTY; without even the
- implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- See the LICENSE file for more information.
-------------------------------------------------------------------------*/
-
-#include <junction/ConcurrentMap_LeapFrog.h>
-
-namespace junction {
-
-TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_LeapFrog, 17) // autogenerated by TidySource.py
-TURF_TRACE_DEFINE("[Mutator] find constructor called")
-TURF_TRACE_DEFINE("[Mutator] find was redirected")
-TURF_TRACE_DEFINE("[Mutator] insertOrFind constructor called")
-TURF_TRACE_DEFINE("[Mutator] insertOrFind was redirected")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] called")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] exchanged Value")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] detected race to write value")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] racing write inserted new value")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] was redirected")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] was re-redirected")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] overflow after redirect")
-TURF_TRACE_DEFINE("[Mutator::eraseValue] called")
-TURF_TRACE_DEFINE("[Mutator::eraseValue] detected race to write value")
-TURF_TRACE_DEFINE("[Mutator::eraseValue] was redirected")
-TURF_TRACE_DEFINE("[Mutator::eraseValue] was re-redirected")
-TURF_TRACE_DEFINE("[get] called")
-TURF_TRACE_DEFINE("[get] was redirected")
-TURF_TRACE_DEFINE_END(ConcurrentMap_LeapFrog, 17)
-
-} // namespace junction
+++ /dev/null
-/*------------------------------------------------------------------------
- Junction: Concurrent data structures in C++
- Copyright (c) 2016 Jeff Preshing
-
- Distributed under the Simplified BSD License.
- Original location: https://github.com/preshing/junction
-
- This software is distributed WITHOUT ANY WARRANTY; without even the
- implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- See the LICENSE file for more information.
-------------------------------------------------------------------------*/
-
-#ifndef JUNCTION_CONCURRENTMAP_LEAPFROG_H
-#define JUNCTION_CONCURRENTMAP_LEAPFROG_H
-
-#include <junction/Core.h>
-#include <junction/details/LeapFrog.h>
-#include <junction/QSBR.h>
-#include <turf/Heap.h>
-#include <turf/Trace.h>
-
-namespace junction {
-
-TURF_TRACE_DECLARE(ConcurrentMap_LeapFrog, 17)
-
-template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V> >
-class ConcurrentMap_LeapFrog {
-public:
- typedef K Key;
- typedef V Value;
- typedef KT KeyTraits;
- typedef VT ValueTraits;
- typedef typename turf::util::BestFit<Key>::Unsigned Hash;
- typedef details::LeapFrog<ConcurrentMap_LeapFrog> Details;
-
-private:
- turf::Atomic<typename Details::Table*> m_root;
-
-public:
- ConcurrentMap_LeapFrog(ureg capacity = Details::InitialSize) : m_root(Details::Table::create(capacity)) {
- }
-
- ~ConcurrentMap_LeapFrog() {
- typename Details::Table* table = m_root.loadNonatomic();
- table->destroy();
- }
-
- // publishTableMigration() is called by exactly one thread from Details::TableMigration::run()
- // after all the threads participating in the migration have completed their work.
- void publishTableMigration(typename Details::TableMigration* migration) {
- // There are no racing calls to this function.
- typename Details::Table* oldRoot = m_root.loadNonatomic();
- m_root.store(migration->m_destination, turf::Release);
- TURF_ASSERT(oldRoot == migration->getSources()[0].table);
- // Caller will GC the TableMigration and the source table.
- }
-
- // A Mutator represents a known cell in the hash table.
- // It's meant for manipulations within a temporary function scope.
- // Obviously you must not call QSBR::Update while holding a Mutator.
- // Any operation that modifies the table (exchangeValue, eraseValue)
- // may be forced to follow a redirected cell, which changes the Mutator itself.
- // Note that even if the Mutator was constructed from an existing cell,
- // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted,
- // or if another thread deletes the key between the two steps.
- class Mutator {
- private:
- friend class ConcurrentMap_LeapFrog;
-
- ConcurrentMap_LeapFrog& m_map;
- typename Details::Table* m_table;
- typename Details::Cell* m_cell;
- Value m_value;
-
- // Constructor: Find existing cell
- Mutator(ConcurrentMap_LeapFrog& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
- TURF_TRACE(ConcurrentMap_LeapFrog, 0, "[Mutator] find constructor called", uptr(0), uptr(key));
- Hash hash = KeyTraits::hash(key);
- for (;;) {
- m_table = m_map.m_root.load(turf::Consume);
- m_cell = Details::find(hash, m_table);
- if (!m_cell)
- return;
- m_value = m_cell->value.load(turf::Consume);
- if (m_value != Value(ValueTraits::Redirect))
- return; // Found an existing value
- // We've encountered a Redirect value. Help finish the migration.
- TURF_TRACE(ConcurrentMap_LeapFrog, 1, "[Mutator] find was redirected", uptr(m_table), 0);
- m_table->jobCoordinator.participate();
- // Try again using the latest root.
- }
- }
-
- // Constructor: Insert or find cell
- Mutator(ConcurrentMap_LeapFrog& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
- TURF_TRACE(ConcurrentMap_LeapFrog, 2, "[Mutator] insertOrFind constructor called", uptr(0), uptr(key));
- Hash hash = KeyTraits::hash(key);
- for (;;) {
- m_table = m_map.m_root.load(turf::Consume);
- ureg overflowIdx;
- switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
- case Details::InsertResult_InsertedNew: {
- // We've inserted a new cell. Don't load m_cell->value.
- return;
- }
- case Details::InsertResult_AlreadyFound: {
- // The hash was already found in the table.
- m_value = m_cell->value.load(turf::Consume);
- if (m_value == Value(ValueTraits::Redirect)) {
- // We've encountered a Redirect value.
- TURF_TRACE(ConcurrentMap_LeapFrog, 3, "[Mutator] insertOrFind was redirected", uptr(m_table), uptr(m_value));
- break; // Help finish the migration.
- }
- return; // Found an existing value
- }
- case Details::InsertResult_Overflow: {
- Details::beginTableMigration(m_map, m_table, overflowIdx);
- break;
- }
- }
- // A migration has been started (either by us, or another thread). Participate until it's complete.
- m_table->jobCoordinator.participate();
- // Try again using the latest root.
- }
- }
-
- public:
- Value getValue() const {
- // Return previously loaded value. Don't load it again.
- return Value(m_value);
- }
-
- Value exchangeValue(Value desired) {
- TURF_ASSERT(desired != Value(ValueTraits::NullValue));
- TURF_ASSERT(desired != Value(ValueTraits::Redirect));
- TURF_ASSERT(m_cell); // Cell must have been found or inserted
- TURF_TRACE(ConcurrentMap_LeapFrog, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
- for (;;) {
- Value oldValue = m_value;
- if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
- // Exchange was successful. Return previous value.
- TURF_TRACE(ConcurrentMap_LeapFrog, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value),
- uptr(desired));
- Value result = m_value;
- m_value = desired; // Leave the mutator in a valid state
- return result;
- }
- // The CAS failed and m_value has been updated with the latest value.
- if (m_value != Value(ValueTraits::Redirect)) {
- TURF_TRACE(ConcurrentMap_LeapFrog, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
- uptr(m_value));
- if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
- TURF_TRACE(ConcurrentMap_LeapFrog, 7, "[Mutator::exchangeValue] racing write inserted new value",
- uptr(m_table), uptr(m_value));
- }
- // There was a racing write (or erase) to this cell.
- // Pretend we exchanged with ourselves, and just let the racing write win.
- return desired;
- }
- // We've encountered a Redirect value. Help finish the migration.
- TURF_TRACE(ConcurrentMap_LeapFrog, 8, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
- Hash hash = m_cell->hash.load(turf::Relaxed);
- for (;;) {
- // Help complete the migration.
- m_table->jobCoordinator.participate();
- // Try again in the new table.
- m_table = m_map.m_root.load(turf::Consume);
- m_value = Value(ValueTraits::NullValue);
- ureg overflowIdx;
- switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
- case Details::InsertResult_AlreadyFound:
- m_value = m_cell->value.load(turf::Consume);
- if (m_value == Value(ValueTraits::Redirect)) {
- TURF_TRACE(ConcurrentMap_LeapFrog, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
- uptr(m_value));
- break;
- }
- goto breakOuter;
- case Details::InsertResult_InsertedNew:
- goto breakOuter;
- case Details::InsertResult_Overflow:
- TURF_TRACE(ConcurrentMap_LeapFrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
- overflowIdx);
- Details::beginTableMigration(m_map, m_table, overflowIdx);
- break;
- }
- // We were redirected... again
- }
- breakOuter:;
- // Try again in the new table.
- }
- }
-
- void setValue(Value desired) {
- exchangeValue(desired);
- }
-
- Value eraseValue() {
- TURF_ASSERT(m_cell); // Cell must have been found or inserted
- TURF_TRACE(ConcurrentMap_LeapFrog, 11, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_cell));
- for (;;) {
- if (m_value == Value(ValueTraits::NullValue))
- return Value(m_value);
- TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
- if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
- // Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
- TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
- Value result = m_value;
- m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
- return result;
- }
- // The CAS failed and m_value has been updated with the latest value.
- TURF_TRACE(ConcurrentMap_LeapFrog, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
- uptr(m_cell));
- if (m_value != Value(ValueTraits::Redirect)) {
- // There was a racing write (or erase) to this cell.
- // Pretend we erased nothing, and just let the racing write win.
- return Value(ValueTraits::NullValue);
- }
- // We've been redirected to a new table.
- TURF_TRACE(ConcurrentMap_LeapFrog, 13, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell));
- Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
- for (;;) {
- // Help complete the migration.
- m_table->jobCoordinator.participate();
- // Try again in the new table.
- m_table = m_map.m_root.load(turf::Consume);
- m_cell = Details::find(hash, m_table);
- if (!m_cell) {
- m_value = Value(ValueTraits::NullValue);
- return m_value;
- }
- m_value = m_cell->value.load(turf::Relaxed);
- if (m_value != Value(ValueTraits::Redirect))
- break;
- TURF_TRACE(ConcurrentMap_LeapFrog, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
- uptr(m_cell));
- }
- }
- }
- };
-
- Mutator insertOrFind(Key key) {
- return Mutator(*this, key);
- }
-
- Mutator find(Key key) {
- return Mutator(*this, key, false);
- }
-
- // Lookup without creating a temporary Mutator.
- Value get(Key key) {
- Hash hash = KeyTraits::hash(key);
- TURF_TRACE(ConcurrentMap_LeapFrog, 15, "[get] called", uptr(this), uptr(hash));
- for (;;) {
- typename Details::Table* table = m_root.load(turf::Consume);
- typename Details::Cell* cell = Details::find(hash, table);
- if (!cell)
- return Value(ValueTraits::NullValue);
- Value value = cell->value.load(turf::Consume);
- if (value != Value(ValueTraits::Redirect))
- return value; // Found an existing value
- // We've been redirected to a new table. Help with the migration.
- TURF_TRACE(ConcurrentMap_LeapFrog, 16, "[get] was redirected", uptr(table), uptr(hash));
- table->jobCoordinator.participate();
- // Try again in the new table.
- }
- }
-
- Value set(Key key, Value desired) {
- Mutator iter(*this, key);
- return iter.exchangeValue(desired);
- }
-
- Value exchange(Key key, Value desired) {
- Mutator iter(*this, key);
- return iter.exchangeValue(desired);
- }
-
- Value erase(Key key) {
- Mutator iter(*this, key, false);
- return iter.eraseValue();
- }
-
- // The easiest way to implement an Iterator is to prevent all Redirects.
- // The currrent Iterator does that by forbidding concurrent inserts.
- // To make it work with concurrent inserts, we'd need a way to block TableMigrations.
- class Iterator {
- private:
- typename Details::Table* m_table;
- ureg m_idx;
- Key m_hash;
- Value m_value;
-
- public:
- Iterator(ConcurrentMap_LeapFrog& map) {
- // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
- m_table = map.m_root.load(turf::Consume);
- m_idx = -1;
- next();
- }
-
- void next() {
- TURF_ASSERT(m_table);
- TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
- while (++m_idx <= m_table->sizeMask) {
- // Index still inside range of table.
- typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2);
- typename Details::Cell* cell = group->cells + (m_idx & 3);
- m_hash = cell->hash.load(turf::Relaxed);
- if (m_hash != KeyTraits::NullHash) {
- // Cell has been reserved.
- m_value = cell->value.load(turf::Relaxed);
- TURF_ASSERT(m_value != Value(ValueTraits::Redirect));
- if (m_value != Value(ValueTraits::NullValue))
- return; // Yield this cell.
- }
- }
- // That's the end of the map.
- m_hash = KeyTraits::NullHash;
- m_value = Value(ValueTraits::NullValue);
- }
-
- bool isValid() const {
- return m_value != Value(ValueTraits::NullValue);
- }
-
- Key getKey() const {
- TURF_ASSERT(isValid());
- // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
- return KeyTraits::dehash(m_hash);
- }
-
- Value getValue() const {
- TURF_ASSERT(isValid());
- return m_value;
- }
- };
-};
-
-} // namespace junction
-
-#endif // JUNCTION_CONCURRENTMAP_LEAPFROG_H
--- /dev/null
+/*------------------------------------------------------------------------
+ Junction: Concurrent data structures in C++
+ Copyright (c) 2016 Jeff Preshing
+
+ Distributed under the Simplified BSD License.
+ Original location: https://github.com/preshing/junction
+
+ This software is distributed WITHOUT ANY WARRANTY; without even the
+ implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ See the LICENSE file for more information.
+------------------------------------------------------------------------*/
+
+#include <junction/ConcurrentMap_Leapfrog.h>
+
+namespace junction {
+
+TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_Leapfrog, 17) // autogenerated by TidySource.py
+TURF_TRACE_DEFINE("[Mutator] find constructor called")
+TURF_TRACE_DEFINE("[Mutator] find was redirected")
+TURF_TRACE_DEFINE("[Mutator] insertOrFind constructor called")
+TURF_TRACE_DEFINE("[Mutator] insertOrFind was redirected")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] called")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] exchanged Value")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] detected race to write value")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] racing write inserted new value")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] was redirected")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] was re-redirected")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] overflow after redirect")
+TURF_TRACE_DEFINE("[Mutator::eraseValue] called")
+TURF_TRACE_DEFINE("[Mutator::eraseValue] detected race to write value")
+TURF_TRACE_DEFINE("[Mutator::eraseValue] was redirected")
+TURF_TRACE_DEFINE("[Mutator::eraseValue] was re-redirected")
+TURF_TRACE_DEFINE("[get] called")
+TURF_TRACE_DEFINE("[get] was redirected")
+TURF_TRACE_DEFINE_END(ConcurrentMap_Leapfrog, 17)
+
+} // namespace junction
--- /dev/null
+/*------------------------------------------------------------------------
+ Junction: Concurrent data structures in C++
+ Copyright (c) 2016 Jeff Preshing
+
+ Distributed under the Simplified BSD License.
+ Original location: https://github.com/preshing/junction
+
+ This software is distributed WITHOUT ANY WARRANTY; without even the
+ implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ See the LICENSE file for more information.
+------------------------------------------------------------------------*/
+
+#ifndef JUNCTION_CONCURRENTMAP_LEAPFROG_H
+#define JUNCTION_CONCURRENTMAP_LEAPFROG_H
+
+#include <junction/Core.h>
+#include <junction/details/Leapfrog.h>
+#include <junction/QSBR.h>
+#include <turf/Heap.h>
+#include <turf/Trace.h>
+
+namespace junction {
+
+TURF_TRACE_DECLARE(ConcurrentMap_Leapfrog, 17)
+
+template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V> >
+class ConcurrentMap_Leapfrog {
+public:
+ typedef K Key;
+ typedef V Value;
+ typedef KT KeyTraits;
+ typedef VT ValueTraits;
+ typedef typename turf::util::BestFit<Key>::Unsigned Hash;
+ typedef details::Leapfrog<ConcurrentMap_Leapfrog> Details;
+
+private:
+ turf::Atomic<typename Details::Table*> m_root;
+
+public:
+ ConcurrentMap_Leapfrog(ureg capacity = Details::InitialSize) : m_root(Details::Table::create(capacity)) {
+ }
+
+ ~ConcurrentMap_Leapfrog() {
+ typename Details::Table* table = m_root.loadNonatomic();
+ table->destroy();
+ }
+
+ // publishTableMigration() is called by exactly one thread from Details::TableMigration::run()
+ // after all the threads participating in the migration have completed their work.
+ void publishTableMigration(typename Details::TableMigration* migration) {
+ // There are no racing calls to this function.
+ typename Details::Table* oldRoot = m_root.loadNonatomic();
+ m_root.store(migration->m_destination, turf::Release);
+ TURF_ASSERT(oldRoot == migration->getSources()[0].table);
+ // Caller will GC the TableMigration and the source table.
+ }
+
+ // A Mutator represents a known cell in the hash table.
+ // It's meant for manipulations within a temporary function scope.
+ // Obviously you must not call QSBR::Update while holding a Mutator.
+ // Any operation that modifies the table (exchangeValue, eraseValue)
+ // may be forced to follow a redirected cell, which changes the Mutator itself.
+ // Note that even if the Mutator was constructed from an existing cell,
+ // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted,
+ // or if another thread deletes the key between the two steps.
+ class Mutator {
+ private:
+ friend class ConcurrentMap_Leapfrog;
+
+ ConcurrentMap_Leapfrog& m_map;
+ typename Details::Table* m_table;
+ typename Details::Cell* m_cell;
+ Value m_value;
+
+ // Constructor: Find existing cell
+ Mutator(ConcurrentMap_Leapfrog& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
+ TURF_TRACE(ConcurrentMap_Leapfrog, 0, "[Mutator] find constructor called", uptr(0), uptr(key));
+ Hash hash = KeyTraits::hash(key);
+ for (;;) {
+ m_table = m_map.m_root.load(turf::Consume);
+ m_cell = Details::find(hash, m_table);
+ if (!m_cell)
+ return;
+ m_value = m_cell->value.load(turf::Consume);
+ if (m_value != Value(ValueTraits::Redirect))
+ return; // Found an existing value
+ // We've encountered a Redirect value. Help finish the migration.
+ TURF_TRACE(ConcurrentMap_Leapfrog, 1, "[Mutator] find was redirected", uptr(m_table), 0);
+ m_table->jobCoordinator.participate();
+ // Try again using the latest root.
+ }
+ }
+
+ // Constructor: Insert or find cell
+ Mutator(ConcurrentMap_Leapfrog& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
+ TURF_TRACE(ConcurrentMap_Leapfrog, 2, "[Mutator] insertOrFind constructor called", uptr(0), uptr(key));
+ Hash hash = KeyTraits::hash(key);
+ for (;;) {
+ m_table = m_map.m_root.load(turf::Consume);
+ ureg overflowIdx;
+ switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
+ case Details::InsertResult_InsertedNew: {
+ // We've inserted a new cell. Don't load m_cell->value.
+ return;
+ }
+ case Details::InsertResult_AlreadyFound: {
+ // The hash was already found in the table.
+ m_value = m_cell->value.load(turf::Consume);
+ if (m_value == Value(ValueTraits::Redirect)) {
+ // We've encountered a Redirect value.
+ TURF_TRACE(ConcurrentMap_Leapfrog, 3, "[Mutator] insertOrFind was redirected", uptr(m_table), uptr(m_value));
+ break; // Help finish the migration.
+ }
+ return; // Found an existing value
+ }
+ case Details::InsertResult_Overflow: {
+ Details::beginTableMigration(m_map, m_table, overflowIdx);
+ break;
+ }
+ }
+ // A migration has been started (either by us, or another thread). Participate until it's complete.
+ m_table->jobCoordinator.participate();
+ // Try again using the latest root.
+ }
+ }
+
+ public:
+ Value getValue() const {
+ // Return previously loaded value. Don't load it again.
+ return Value(m_value);
+ }
+
+ Value exchangeValue(Value desired) {
+ TURF_ASSERT(desired != Value(ValueTraits::NullValue));
+ TURF_ASSERT(desired != Value(ValueTraits::Redirect));
+ TURF_ASSERT(m_cell); // Cell must have been found or inserted
+ TURF_TRACE(ConcurrentMap_Leapfrog, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
+ for (;;) {
+ Value oldValue = m_value;
+ if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
+ // Exchange was successful. Return previous value.
+ TURF_TRACE(ConcurrentMap_Leapfrog, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value),
+ uptr(desired));
+ Value result = m_value;
+ m_value = desired; // Leave the mutator in a valid state
+ return result;
+ }
+ // The CAS failed and m_value has been updated with the latest value.
+ if (m_value != Value(ValueTraits::Redirect)) {
+ TURF_TRACE(ConcurrentMap_Leapfrog, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
+ uptr(m_value));
+ if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
+ TURF_TRACE(ConcurrentMap_Leapfrog, 7, "[Mutator::exchangeValue] racing write inserted new value",
+ uptr(m_table), uptr(m_value));
+ }
+ // There was a racing write (or erase) to this cell.
+ // Pretend we exchanged with ourselves, and just let the racing write win.
+ return desired;
+ }
+ // We've encountered a Redirect value. Help finish the migration.
+ TURF_TRACE(ConcurrentMap_Leapfrog, 8, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
+ Hash hash = m_cell->hash.load(turf::Relaxed);
+ for (;;) {
+ // Help complete the migration.
+ m_table->jobCoordinator.participate();
+ // Try again in the new table.
+ m_table = m_map.m_root.load(turf::Consume);
+ m_value = Value(ValueTraits::NullValue);
+ ureg overflowIdx;
+ switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
+ case Details::InsertResult_AlreadyFound:
+ m_value = m_cell->value.load(turf::Consume);
+ if (m_value == Value(ValueTraits::Redirect)) {
+ TURF_TRACE(ConcurrentMap_Leapfrog, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
+ uptr(m_value));
+ break;
+ }
+ goto breakOuter;
+ case Details::InsertResult_InsertedNew:
+ goto breakOuter;
+ case Details::InsertResult_Overflow:
+ TURF_TRACE(ConcurrentMap_Leapfrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
+ overflowIdx);
+ Details::beginTableMigration(m_map, m_table, overflowIdx);
+ break;
+ }
+ // We were redirected... again
+ }
+ breakOuter:;
+ // Try again in the new table.
+ }
+ }
+
+ void setValue(Value desired) {
+ exchangeValue(desired);
+ }
+
+ Value eraseValue() {
+ TURF_ASSERT(m_cell); // Cell must have been found or inserted
+ TURF_TRACE(ConcurrentMap_Leapfrog, 11, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_cell));
+ for (;;) {
+ if (m_value == Value(ValueTraits::NullValue))
+ return Value(m_value);
+ TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
+ if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
+ // Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
+ TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
+ Value result = m_value;
+ m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
+ return result;
+ }
+ // The CAS failed and m_value has been updated with the latest value.
+ TURF_TRACE(ConcurrentMap_Leapfrog, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
+ uptr(m_cell));
+ if (m_value != Value(ValueTraits::Redirect)) {
+ // There was a racing write (or erase) to this cell.
+ // Pretend we erased nothing, and just let the racing write win.
+ return Value(ValueTraits::NullValue);
+ }
+ // We've been redirected to a new table.
+ TURF_TRACE(ConcurrentMap_Leapfrog, 13, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell));
+ Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
+ for (;;) {
+ // Help complete the migration.
+ m_table->jobCoordinator.participate();
+ // Try again in the new table.
+ m_table = m_map.m_root.load(turf::Consume);
+ m_cell = Details::find(hash, m_table);
+ if (!m_cell) {
+ m_value = Value(ValueTraits::NullValue);
+ return m_value;
+ }
+ m_value = m_cell->value.load(turf::Relaxed);
+ if (m_value != Value(ValueTraits::Redirect))
+ break;
+ TURF_TRACE(ConcurrentMap_Leapfrog, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
+ uptr(m_cell));
+ }
+ }
+ }
+ };
+
+ Mutator insertOrFind(Key key) {
+ return Mutator(*this, key);
+ }
+
+ Mutator find(Key key) {
+ return Mutator(*this, key, false);
+ }
+
+ // Lookup without creating a temporary Mutator.
+ Value get(Key key) {
+ Hash hash = KeyTraits::hash(key);
+ TURF_TRACE(ConcurrentMap_Leapfrog, 15, "[get] called", uptr(this), uptr(hash));
+ for (;;) {
+ typename Details::Table* table = m_root.load(turf::Consume);
+ typename Details::Cell* cell = Details::find(hash, table);
+ if (!cell)
+ return Value(ValueTraits::NullValue);
+ Value value = cell->value.load(turf::Consume);
+ if (value != Value(ValueTraits::Redirect))
+ return value; // Found an existing value
+ // We've been redirected to a new table. Help with the migration.
+ TURF_TRACE(ConcurrentMap_Leapfrog, 16, "[get] was redirected", uptr(table), uptr(hash));
+ table->jobCoordinator.participate();
+ // Try again in the new table.
+ }
+ }
+
+ Value set(Key key, Value desired) {
+ Mutator iter(*this, key);
+ return iter.exchangeValue(desired);
+ }
+
+ Value exchange(Key key, Value desired) {
+ Mutator iter(*this, key);
+ return iter.exchangeValue(desired);
+ }
+
+ Value erase(Key key) {
+ Mutator iter(*this, key, false);
+ return iter.eraseValue();
+ }
+
+ // The easiest way to implement an Iterator is to prevent all Redirects.
+ // The currrent Iterator does that by forbidding concurrent inserts.
+ // To make it work with concurrent inserts, we'd need a way to block TableMigrations.
+ class Iterator {
+ private:
+ typename Details::Table* m_table;
+ ureg m_idx;
+ Key m_hash;
+ Value m_value;
+
+ public:
+ Iterator(ConcurrentMap_Leapfrog& map) {
+ // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
+ m_table = map.m_root.load(turf::Consume);
+ m_idx = -1;
+ next();
+ }
+
+ void next() {
+ TURF_ASSERT(m_table);
+ TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
+ while (++m_idx <= m_table->sizeMask) {
+ // Index still inside range of table.
+ typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2);
+ typename Details::Cell* cell = group->cells + (m_idx & 3);
+ m_hash = cell->hash.load(turf::Relaxed);
+ if (m_hash != KeyTraits::NullHash) {
+ // Cell has been reserved.
+ m_value = cell->value.load(turf::Relaxed);
+ TURF_ASSERT(m_value != Value(ValueTraits::Redirect));
+ if (m_value != Value(ValueTraits::NullValue))
+ return; // Yield this cell.
+ }
+ }
+ // That's the end of the map.
+ m_hash = KeyTraits::NullHash;
+ m_value = Value(ValueTraits::NullValue);
+ }
+
+ bool isValid() const {
+ return m_value != Value(ValueTraits::NullValue);
+ }
+
+ Key getKey() const {
+ TURF_ASSERT(isValid());
+ // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
+ return KeyTraits::dehash(m_hash);
+ }
+
+ Value getValue() const {
+ TURF_ASSERT(isValid());
+ return m_value;
+ }
+ };
+};
+
+} // namespace junction
+
+#endif // JUNCTION_CONCURRENTMAP_LEAPFROG_H
+++ /dev/null
-/*------------------------------------------------------------------------
- Junction: Concurrent data structures in C++
- Copyright (c) 2016 Jeff Preshing
-
- Distributed under the Simplified BSD License.
- Original location: https://github.com/preshing/junction
-
- This software is distributed WITHOUT ANY WARRANTY; without even the
- implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- See the LICENSE file for more information.
-------------------------------------------------------------------------*/
-
-#include <junction/Core.h>
-#include <junction/details/LeapFrog.h>
-#include <turf/Heap.h>
-
-namespace junction {
-namespace details {
-
-TURF_TRACE_DEFINE_BEGIN(LeapFrog, 33) // autogenerated by TidySource.py
-TURF_TRACE_DEFINE("[find] called")
-TURF_TRACE_DEFINE("[find] found existing cell optimistically")
-TURF_TRACE_DEFINE("[find] found existing cell")
-TURF_TRACE_DEFINE("[insertOrFind] called")
-TURF_TRACE_DEFINE("[insertOrFind] reserved first cell")
-TURF_TRACE_DEFINE("[insertOrFind] race to reserve first cell")
-TURF_TRACE_DEFINE("[insertOrFind] found in first cell")
-TURF_TRACE_DEFINE("[insertOrFind] race to read hash")
-TURF_TRACE_DEFINE("[insertOrFind] found in probe chain")
-TURF_TRACE_DEFINE("[insertOrFind] reserved cell")
-TURF_TRACE_DEFINE("[insertOrFind] race to reserve cell")
-TURF_TRACE_DEFINE("[insertOrFind] found outside probe chain")
-TURF_TRACE_DEFINE("[insertOrFind] found late-arriving cell in same bucket")
-TURF_TRACE_DEFINE("[insertOrFind] set link on behalf of late-arriving cell")
-TURF_TRACE_DEFINE("[insertOrFind] overflow")
-TURF_TRACE_DEFINE("[beginTableMigrationToSize] called")
-TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists")
-TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)")
-TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size")
-TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected")
-TURF_TRACE_DEFINE("[migrateRange] race to insert key")
-TURF_TRACE_DEFINE("[migrateRange] race to insert value")
-TURF_TRACE_DEFINE("[migrateRange] race inserted Redirect")
-TURF_TRACE_DEFINE("[migrateRange] in-use cell already redirected")
-TURF_TRACE_DEFINE("[migrateRange] racing update was erase")
-TURF_TRACE_DEFINE("[migrateRange] race to update migrated value")
-TURF_TRACE_DEFINE("[TableMigration::run] already ended")
-TURF_TRACE_DEFINE("[TableMigration::run] detected end flag set")
-TURF_TRACE_DEFINE("[TableMigration::run] destination overflow")
-TURF_TRACE_DEFINE("[TableMigration::run] race to set m_overflowed")
-TURF_TRACE_DEFINE("[TableMigration::run] out of migration units")
-TURF_TRACE_DEFINE("[TableMigration::run] not the last worker")
-TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started")
-TURF_TRACE_DEFINE_END(LeapFrog, 33)
-
-} // namespace details
-} // namespace junction
+++ /dev/null
-/*------------------------------------------------------------------------
- Junction: Concurrent data structures in C++
- Copyright (c) 2016 Jeff Preshing
-
- Distributed under the Simplified BSD License.
- Original location: https://github.com/preshing/junction
-
- This software is distributed WITHOUT ANY WARRANTY; without even the
- implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- See the LICENSE file for more information.
-------------------------------------------------------------------------*/
-
-#ifndef JUNCTION_DETAILS_LEAPFROG_H
-#define JUNCTION_DETAILS_LEAPFROG_H
-
-#include <junction/Core.h>
-#include <turf/Atomic.h>
-#include <turf/Mutex.h>
-#include <turf/ManualResetEvent.h>
-#include <turf/Util.h>
-#include <junction/MapTraits.h>
-#include <turf/Trace.h>
-#include <turf/Heap.h>
-#include <junction/SimpleJobCoordinator.h>
-#include <junction/QSBR.h>
-
-// Enable this to force migration overflows (for test purposes):
-#define JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS 0
-
-namespace junction {
-namespace details {
-
-TURF_TRACE_DECLARE(LeapFrog, 33)
-
-template <class Map>
-struct LeapFrog {
- typedef typename Map::Hash Hash;
- typedef typename Map::Value Value;
- typedef typename Map::KeyTraits KeyTraits;
- typedef typename Map::ValueTraits ValueTraits;
-
- static const ureg InitialSize = 8;
- static const ureg TableMigrationUnitSize = 32;
- static const ureg LinearSearchLimit = 128;
- static const ureg CellsInUseSample = LinearSearchLimit;
- TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
- TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
-
- struct Cell {
- turf::Atomic<Hash> hash;
- turf::Atomic<Value> value;
- };
-
- struct CellGroup {
- // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain.
- // Each cell in the probe chain is located within the table itself.
- // "deltas" determines the index of the next cell in the probe chain.
- // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket.
- // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket.
- // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket.
- turf::Atomic<u8> deltas[8];
- Cell cells[4];
- };
-
- struct Table {
- const ureg sizeMask; // a power of two minus one
- turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
- SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
-
- Table(ureg sizeMask) : sizeMask(sizeMask) {
- }
-
- static Table* create(ureg tableSize) {
- TURF_ASSERT(turf::util::isPowerOf2(tableSize));
- TURF_ASSERT(tableSize >= 4);
- ureg numGroups = tableSize >> 2;
- Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups);
- new (table) Table(tableSize - 1);
- for (ureg i = 0; i < numGroups; i++) {
- CellGroup* group = table->getCellGroups() + i;
- for (ureg j = 0; j < 4; j++) {
- group->deltas[j].storeNonatomic(0);
- group->deltas[j + 4].storeNonatomic(0);
- group->cells[j].hash.storeNonatomic(KeyTraits::NullHash);
- group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue));
- }
- }
- return table;
- }
-
- void destroy() {
- this->Table::~Table();
- TURF_HEAP.free(this);
- }
-
- CellGroup* getCellGroups() const {
- return (CellGroup*) (this + 1);
- }
-
- ureg getNumMigrationUnits() const {
- return sizeMask / TableMigrationUnitSize + 1;
- }
- };
-
- class TableMigration : public SimpleJobCoordinator::Job {
- public:
- struct Source {
- Table* table;
- turf::Atomic<ureg> sourceIndex;
- };
-
- Map& m_map;
- Table* m_destination;
- turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
- turf::Atomic<bool> m_overflowed;
- turf::Atomic<sreg> m_unitsRemaining;
- ureg m_numSources;
-
- TableMigration(Map& map) : m_map(map) {
- }
-
- static TableMigration* create(Map& map, ureg numSources) {
- TableMigration* migration =
- (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
- new (migration) TableMigration(map);
- migration->m_workerStatus.storeNonatomic(0);
- migration->m_overflowed.storeNonatomic(false);
- migration->m_unitsRemaining.storeNonatomic(0);
- migration->m_numSources = numSources;
- // Caller is responsible for filling in sources & destination
- return migration;
- }
-
- virtual ~TableMigration() TURF_OVERRIDE {
- }
-
- void destroy() {
- // Destroy all source tables.
- for (ureg i = 0; i < m_numSources; i++)
- if (getSources()[i].table)
- getSources()[i].table->destroy();
- // Delete the migration object itself.
- this->TableMigration::~TableMigration();
- TURF_HEAP.free(this);
- }
-
- Source* getSources() const {
- return (Source*) (this + 1);
- }
-
- bool migrateRange(Table* srcTable, ureg startIdx);
- virtual void run() TURF_OVERRIDE;
- };
-
- static Cell* find(Hash hash, Table* table) {
- TURF_TRACE(LeapFrog, 0, "[find] called", uptr(table), hash);
- TURF_ASSERT(table);
- TURF_ASSERT(hash != KeyTraits::NullHash);
- ureg sizeMask = table->sizeMask;
- // Optimistically check hashed cell even though it might belong to another bucket
- ureg idx = hash & sizeMask;
- CellGroup* group = table->getCellGroups() + (idx >> 2);
- Cell* cell = group->cells + (idx & 3);
- Hash probeHash = cell->hash.load(turf::Relaxed);
- if (probeHash == hash) {
- TURF_TRACE(LeapFrog, 1, "[find] found existing cell optimistically", uptr(table), idx);
- return cell;
- } else if (probeHash == KeyTraits::NullHash) {
- return cell = NULL;
- }
- // Follow probe chain for our bucket
- u8 delta = group->deltas[idx & 3].load(turf::Relaxed);
- while (delta) {
- idx = (idx + delta) & sizeMask;
- group = table->getCellGroups() + (idx >> 2);
- cell = group->cells + (idx & 3);
- Hash probeHash = cell->hash.load(turf::Relaxed);
- // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert,
- // but we don't check for it. We just follow the probe chain.
- if (probeHash == hash) {
- TURF_TRACE(LeapFrog, 2, "[find] found existing cell", uptr(table), idx);
- return cell;
- }
- delta = group->deltas[(idx & 3) + 4].load(turf::Relaxed);
- }
- // End of probe chain, not found
- return NULL;
- }
-
- // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
- enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
- static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) {
- TURF_TRACE(LeapFrog, 3, "[insertOrFind] called", uptr(table), hash);
- TURF_ASSERT(table);
- TURF_ASSERT(hash != KeyTraits::NullHash);
- ureg sizeMask = table->sizeMask;
- ureg idx = ureg(hash);
-
- // Check hashed cell first, though it may not even belong to the bucket.
- CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
- cell = group->cells + (idx & 3);
- Hash probeHash = cell->hash.load(turf::Relaxed);
- if (probeHash == KeyTraits::NullHash) {
- if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
- TURF_TRACE(LeapFrog, 4, "[insertOrFind] reserved first cell", uptr(table), idx);
- // There are no links to set. We're done.
- return InsertResult_InsertedNew;
- } else {
- TURF_TRACE(LeapFrog, 5, "[insertOrFind] race to reserve first cell", uptr(table), idx);
- // Fall through to check if it was the same hash...
- }
- }
- if (probeHash == hash) {
- TURF_TRACE(LeapFrog, 6, "[insertOrFind] found in first cell", uptr(table), idx);
- return InsertResult_AlreadyFound;
- }
-
- // Follow the link chain for this bucket.
- ureg maxIdx = idx + sizeMask;
- ureg linkLevel = 0;
- turf::Atomic<u8>* prevLink;
- for (;;) {
- followLink:
- prevLink = group->deltas + ((idx & 3) + linkLevel);
- linkLevel = 4;
- u8 probeDelta = prevLink->load(turf::Relaxed);
- if (probeDelta) {
- idx += probeDelta;
- // Check the hash for this cell.
- group = table->getCellGroups() + ((idx & sizeMask) >> 2);
- cell = group->cells + (idx & 3);
- probeHash = cell->hash.load(turf::Relaxed);
- if (probeHash == KeyTraits::NullHash) {
- // Cell was linked, but hash is not visible yet.
- // We could avoid this case (and guarantee it's visible) using acquire & release, but instead,
- // just poll until it becomes visible.
- TURF_TRACE(LeapFrog, 7, "[insertOrFind] race to read hash", uptr(table), idx);
- do {
- probeHash = cell->hash.load(turf::Acquire);
- } while (probeHash == KeyTraits::NullHash);
- }
- TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
- if (probeHash == hash) {
- TURF_TRACE(LeapFrog, 8, "[insertOrFind] found in probe chain", uptr(table), idx);
- return InsertResult_AlreadyFound;
- }
- } else {
- // Reached the end of the link chain for this bucket.
- // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket.
- ureg prevLinkIdx = idx;
- TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
- ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit);
- while (linearProbesRemaining-- > 0) {
- idx++;
- group = table->getCellGroups() + ((idx & sizeMask) >> 2);
- cell = group->cells + (idx & 3);
- probeHash = cell->hash.load(turf::Relaxed);
- if (probeHash == KeyTraits::NullHash) {
- // It's an empty cell. Try to reserve it.
- if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
- // Success. We've reserved the cell. Link it to previous cell in same bucket.
- TURF_TRACE(LeapFrog, 9, "[insertOrFind] reserved cell", uptr(table), idx);
- TURF_ASSERT(probeDelta == 0);
- u8 desiredDelta = idx - prevLinkIdx;
-#if TURF_WITH_ASSERTS
- probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
- TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
-#else
- prevLink->store(desiredDelta, turf::Relaxed);
-#endif
- return InsertResult_InsertedNew;
- } else {
- TURF_TRACE(LeapFrog, 10, "[insertOrFind] race to reserve cell", uptr(table), idx);
- // Fall through to check if it's the same hash...
- }
- }
- Hash x = (probeHash ^ hash);
- // Check for same hash.
- if (!x) {
- TURF_TRACE(LeapFrog, 11, "[insertOrFind] found outside probe chain", uptr(table), idx);
- return InsertResult_AlreadyFound;
- }
- // Check for same bucket.
- if ((x & sizeMask) == 0) {
- TURF_TRACE(LeapFrog, 12, "[insertOrFind] found late-arriving cell in same bucket", uptr(table), idx);
- // Attempt to set the link on behalf of the late-arriving cell.
- // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here,
- // there's no guarantee that our own link chain will be well-formed by the time this function returns.
- // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.)
- u8 desiredDelta = idx - prevLinkIdx;
-#if TURF_WITH_ASSERTS
- probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
- TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
- if (probeDelta == 0)
- TURF_TRACE(LeapFrog, 13, "[insertOrFind] set link on behalf of late-arriving cell", uptr(table), idx);
-#else
- prevLink->store(desiredDelta, turf::Relaxed);
-#endif
- goto followLink; // Try to follow link chain for the bucket again.
- }
- // Continue linear search...
- }
- // Table is too full to insert.
- overflowIdx = idx + 1;
- TURF_TRACE(LeapFrog, 14, "[insertOrFind] overflow", uptr(table), overflowIdx);
- return InsertResult_Overflow;
- }
- }
- }
-
- static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
- // Create new migration by DCLI.
- TURF_TRACE(LeapFrog, 15, "[beginTableMigrationToSize] called", 0, 0);
- SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
- if (job) {
- TURF_TRACE(LeapFrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0);
- } else {
- turf::LockGuard<turf::Mutex> guard(table->mutex);
- job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
- if (job) {
- TURF_TRACE(LeapFrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
- } else {
- // Create new migration.
- TableMigration* migration = TableMigration::create(map, 1);
- migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
- migration->getSources()[0].table = table;
- migration->getSources()[0].sourceIndex.storeNonatomic(0);
- migration->m_destination = Table::create(nextTableSize);
- // Publish the new migration.
- table->jobCoordinator.storeRelease(migration);
- }
- }
- }
-
- static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) {
- // Estimate number of cells in use based on a small sample.
- ureg sizeMask = table->sizeMask;
- ureg idx = overflowIdx - CellsInUseSample;
- ureg inUseCells = 0;
- for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
- CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
- Cell* cell = group->cells + (idx & 3);
- Value value = cell->value.load(turf::Relaxed);
- if (value == Value(ValueTraits::Redirect)) {
- // Another thread kicked off the jobCoordinator. The caller will participate upon return.
- TURF_TRACE(LeapFrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0);
- return;
- }
- if (value != Value(ValueTraits::NullValue))
- inUseCells++;
- idx++;
- }
- float inUseRatio = float(inUseCells) / CellsInUseSample;
- float estimatedInUse = (sizeMask + 1) * inUseRatio;
-#if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS
- // Periodically underestimate the number of cells in use.
- // This exercises the code that handles overflow during migration.
- static ureg counter = 1;
- if ((++counter & 3) == 0) {
- estimatedInUse /= 4;
- }
-#endif
- ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
- beginTableMigrationToSize(map, table, nextTableSize);
- }
-}; // LeapFrog
-
-template <class Map>
-bool LeapFrog<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
- ureg srcSizeMask = srcTable->sizeMask;
- ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
- // Iterate over source range.
- for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
- CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2);
- Cell* srcCell = srcGroup->cells + (srcIdx & 3);
- Hash srcHash;
- Value srcValue;
- // Fetch the srcHash and srcValue.
- for (;;) {
- srcHash = srcCell->hash.load(turf::Relaxed);
- if (srcHash == KeyTraits::NullHash) {
- // An unused cell. Try to put a Redirect marker in its value.
- srcValue =
- srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
- if (srcValue == Value(ValueTraits::Redirect)) {
- // srcValue is already marked Redirect due to previous incomplete migration.
- TURF_TRACE(LeapFrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
- break;
- }
- if (srcValue == Value(ValueTraits::NullValue))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(LeapFrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
- // Otherwise, somebody just claimed the cell. Read srcHash again...
- } else {
- // Check for deleted/uninitialized value.
- srcValue = srcCell->value.load(turf::Relaxed);
- if (srcValue == Value(ValueTraits::NullValue)) {
- // Try to put a Redirect marker.
- if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
- break; // Redirect has been placed. Break inner loop, continue outer loop.
- TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
- if (srcValue == Value(ValueTraits::Redirect)) {
- // FIXME: I don't think this will happen. Investigate & change to assert
- TURF_TRACE(LeapFrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
- break;
- }
- } else if (srcValue == Value(ValueTraits::Redirect)) {
- // srcValue is already marked Redirect due to previous incomplete migration.
- TURF_TRACE(LeapFrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
- break;
- }
-
- // We've got a key/value pair to migrate.
- // Reserve a destination cell in the destination.
- TURF_ASSERT(srcHash != KeyTraits::NullHash);
- TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
- TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
- Cell* dstCell;
- ureg overflowIdx;
- InsertResult result = insertOrFind(srcHash, m_destination, dstCell, overflowIdx);
- // During migration, a hash can only exist in one place among all the source tables,
- // and it is only migrated by one thread. Therefore, the hash will never already exist
- // in the destination table:
- TURF_ASSERT(result != InsertResult_AlreadyFound);
- if (result == InsertResult_Overflow) {
- // Destination overflow.
- // This can happen for several reasons. For example, the source table could have
- // existed of all deleted cells when it overflowed, resulting in a small destination
- // table size, but then another thread could re-insert all the same hashes
- // before the migration completed.
- // Caller will cancel the current migration and begin a new one.
- return false;
- }
- // Migrate the old value to the new cell.
- for (;;) {
- // Copy srcValue to the destination.
- dstCell->value.store(srcValue, turf::Relaxed);
- // Try to place a Redirect marker in srcValue.
- Value doubleCheckedSrcValue =
- srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
- TURF_ASSERT(doubleCheckedSrcValue !=
- Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
- if (doubleCheckedSrcValue == srcValue) {
- // No racing writes to the src. We've successfully placed the Redirect marker.
- // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
- // by a late-arriving erase.
- if (srcValue == Value(ValueTraits::NullValue))
- TURF_TRACE(LeapFrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
- break;
- }
- // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
- TURF_TRACE(LeapFrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
- srcValue = doubleCheckedSrcValue;
- }
- // Cell successfully migrated. Proceed to next source cell.
- break;
- }
- }
- }
- // Range has been migrated successfully.
- return true;
-}
-
-template <class Map>
-void LeapFrog<Map>::TableMigration::run() {
- // Conditionally increment the shared # of workers.
- ureg probeStatus = m_workerStatus.load(turf::Relaxed);
- do {
- if (probeStatus & 1) {
- // End flag is already set, so do nothing.
- TURF_TRACE(LeapFrog, 26, "[TableMigration::run] already ended", uptr(this), 0);
- return;
- }
- } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
- // # of workers has been incremented, and the end flag is clear.
- TURF_ASSERT((probeStatus & 1) == 0);
-
- // Iterate over all source tables.
- for (ureg s = 0; s < m_numSources; s++) {
- Source& source = getSources()[s];
- // Loop over all migration units in this source table.
- for (;;) {
- if (m_workerStatus.load(turf::Relaxed) & 1) {
- TURF_TRACE(LeapFrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0);
- goto endMigration;
- }
- ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
- if (startIdx >= source.table->sizeMask + 1)
- break; // No more migration units in this table. Try next source table.
- bool overflowed = !migrateRange(source.table, startIdx);
- if (overflowed) {
- // *** FAILED MIGRATION ***
- // TableMigration failed due to destination table overflow.
- // No other thread can declare the migration successful at this point, because *this* unit will never complete,
- // hence m_unitsRemaining won't reach zero.
- // However, multiple threads can independently detect a failed migration at the same time.
- TURF_TRACE(LeapFrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
- // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
- // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
- // the thread
- // that deals with it.
- bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
- if (oldOverflowed)
- TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
- uptr(oldOverflowed));
- m_workerStatus.fetchOr(1, turf::Relaxed);
- goto endMigration;
- }
- sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
- TURF_ASSERT(prevRemaining > 0);
- if (prevRemaining == 1) {
- // *** SUCCESSFUL MIGRATION ***
- // That was the last chunk to migrate.
- m_workerStatus.fetchOr(1, turf::Relaxed);
- goto endMigration;
- }
- }
- }
- TURF_TRACE(LeapFrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0);
-
-endMigration:
- // Decrement the shared # of workers.
- probeStatus = m_workerStatus.fetchSub(
- 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
- if (probeStatus >= 4) {
- // There are other workers remaining. Return here so that only the very last worker will proceed.
- TURF_TRACE(LeapFrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
- return;
- }
-
- // We're the very last worker thread.
- // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
- TURF_ASSERT(probeStatus == 3);
- bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
- if (!overflowed) {
- // The migration succeeded. This is the most likely outcome. Publish the new subtree.
- m_map.publishTableMigration(this);
- // End the jobCoodinator.
- getSources()[0].table->jobCoordinator.end();
- } else {
- // The migration failed due to the overflow of the destination table.
- Table* origTable = getSources()[0].table;
- turf::LockGuard<turf::Mutex> guard(origTable->mutex);
- SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
- if (checkedJob != this) {
- TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
- uptr(checkedJob));
- } else {
- TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
- // Double the destination table size.
- migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
- // Transfer source tables to the new migration.
- for (ureg i = 0; i < m_numSources; i++) {
- migration->getSources()[i].table = getSources()[i].table;
- getSources()[i].table = NULL;
- migration->getSources()[i].sourceIndex.storeNonatomic(0);
- }
- migration->getSources()[m_numSources].table = m_destination;
- migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
- // Calculate total number of migration units to move.
- ureg unitsRemaining = 0;
- for (ureg s = 0; s < migration->m_numSources; s++)
- unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
- migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
- // Publish the new migration.
- origTable->jobCoordinator.storeRelease(migration);
- }
- }
-
- // We're done with this TableMigration. Queue it for GC.
- DefaultQSBR.enqueue(&TableMigration::destroy, this);
-}
-
-} // namespace details
-} // namespace junction
-
-#endif // JUNCTION_DETAILS_LEAPFROG_H
--- /dev/null
+/*------------------------------------------------------------------------
+ Junction: Concurrent data structures in C++
+ Copyright (c) 2016 Jeff Preshing
+
+ Distributed under the Simplified BSD License.
+ Original location: https://github.com/preshing/junction
+
+ This software is distributed WITHOUT ANY WARRANTY; without even the
+ implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ See the LICENSE file for more information.
+------------------------------------------------------------------------*/
+
+#include <junction/Core.h>
+#include <junction/details/Leapfrog.h>
+#include <turf/Heap.h>
+
+namespace junction {
+namespace details {
+
+TURF_TRACE_DEFINE_BEGIN(Leapfrog, 33) // autogenerated by TidySource.py
+TURF_TRACE_DEFINE("[find] called")
+TURF_TRACE_DEFINE("[find] found existing cell optimistically")
+TURF_TRACE_DEFINE("[find] found existing cell")
+TURF_TRACE_DEFINE("[insertOrFind] called")
+TURF_TRACE_DEFINE("[insertOrFind] reserved first cell")
+TURF_TRACE_DEFINE("[insertOrFind] race to reserve first cell")
+TURF_TRACE_DEFINE("[insertOrFind] found in first cell")
+TURF_TRACE_DEFINE("[insertOrFind] race to read hash")
+TURF_TRACE_DEFINE("[insertOrFind] found in probe chain")
+TURF_TRACE_DEFINE("[insertOrFind] reserved cell")
+TURF_TRACE_DEFINE("[insertOrFind] race to reserve cell")
+TURF_TRACE_DEFINE("[insertOrFind] found outside probe chain")
+TURF_TRACE_DEFINE("[insertOrFind] found late-arriving cell in same bucket")
+TURF_TRACE_DEFINE("[insertOrFind] set link on behalf of late-arriving cell")
+TURF_TRACE_DEFINE("[insertOrFind] overflow")
+TURF_TRACE_DEFINE("[beginTableMigrationToSize] called")
+TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists")
+TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)")
+TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size")
+TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected")
+TURF_TRACE_DEFINE("[migrateRange] race to insert key")
+TURF_TRACE_DEFINE("[migrateRange] race to insert value")
+TURF_TRACE_DEFINE("[migrateRange] race inserted Redirect")
+TURF_TRACE_DEFINE("[migrateRange] in-use cell already redirected")
+TURF_TRACE_DEFINE("[migrateRange] racing update was erase")
+TURF_TRACE_DEFINE("[migrateRange] race to update migrated value")
+TURF_TRACE_DEFINE("[TableMigration::run] already ended")
+TURF_TRACE_DEFINE("[TableMigration::run] detected end flag set")
+TURF_TRACE_DEFINE("[TableMigration::run] destination overflow")
+TURF_TRACE_DEFINE("[TableMigration::run] race to set m_overflowed")
+TURF_TRACE_DEFINE("[TableMigration::run] out of migration units")
+TURF_TRACE_DEFINE("[TableMigration::run] not the last worker")
+TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started")
+TURF_TRACE_DEFINE_END(Leapfrog, 33)
+
+} // namespace details
+} // namespace junction
--- /dev/null
+/*------------------------------------------------------------------------
+ Junction: Concurrent data structures in C++
+ Copyright (c) 2016 Jeff Preshing
+
+ Distributed under the Simplified BSD License.
+ Original location: https://github.com/preshing/junction
+
+ This software is distributed WITHOUT ANY WARRANTY; without even the
+ implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ See the LICENSE file for more information.
+------------------------------------------------------------------------*/
+
+#ifndef JUNCTION_DETAILS_LEAPFROG_H
+#define JUNCTION_DETAILS_LEAPFROG_H
+
+#include <junction/Core.h>
+#include <turf/Atomic.h>
+#include <turf/Mutex.h>
+#include <turf/ManualResetEvent.h>
+#include <turf/Util.h>
+#include <junction/MapTraits.h>
+#include <turf/Trace.h>
+#include <turf/Heap.h>
+#include <junction/SimpleJobCoordinator.h>
+#include <junction/QSBR.h>
+
+// Enable this to force migration overflows (for test purposes):
+#define JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS 0
+
+namespace junction {
+namespace details {
+
+TURF_TRACE_DECLARE(Leapfrog, 33)
+
+template <class Map>
+struct Leapfrog {
+ typedef typename Map::Hash Hash;
+ typedef typename Map::Value Value;
+ typedef typename Map::KeyTraits KeyTraits;
+ typedef typename Map::ValueTraits ValueTraits;
+
+ static const ureg InitialSize = 8;
+ static const ureg TableMigrationUnitSize = 32;
+ static const ureg LinearSearchLimit = 128;
+ static const ureg CellsInUseSample = LinearSearchLimit;
+ TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256); // Must fit in CellGroup::links
+ TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
+
+ struct Cell {
+ turf::Atomic<Hash> hash;
+ turf::Atomic<Value> value;
+ };
+
+ struct CellGroup {
+ // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain.
+ // Each cell in the probe chain is located within the table itself.
+ // "deltas" determines the index of the next cell in the probe chain.
+ // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket.
+ // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket.
+ // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket.
+ turf::Atomic<u8> deltas[8];
+ Cell cells[4];
+ };
+
+ struct Table {
+ const ureg sizeMask; // a power of two minus one
+ turf::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
+ SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
+
+ Table(ureg sizeMask) : sizeMask(sizeMask) {
+ }
+
+ static Table* create(ureg tableSize) {
+ TURF_ASSERT(turf::util::isPowerOf2(tableSize));
+ TURF_ASSERT(tableSize >= 4);
+ ureg numGroups = tableSize >> 2;
+ Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups);
+ new (table) Table(tableSize - 1);
+ for (ureg i = 0; i < numGroups; i++) {
+ CellGroup* group = table->getCellGroups() + i;
+ for (ureg j = 0; j < 4; j++) {
+ group->deltas[j].storeNonatomic(0);
+ group->deltas[j + 4].storeNonatomic(0);
+ group->cells[j].hash.storeNonatomic(KeyTraits::NullHash);
+ group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue));
+ }
+ }
+ return table;
+ }
+
+ void destroy() {
+ this->Table::~Table();
+ TURF_HEAP.free(this);
+ }
+
+ CellGroup* getCellGroups() const {
+ return (CellGroup*) (this + 1);
+ }
+
+ ureg getNumMigrationUnits() const {
+ return sizeMask / TableMigrationUnitSize + 1;
+ }
+ };
+
+ class TableMigration : public SimpleJobCoordinator::Job {
+ public:
+ struct Source {
+ Table* table;
+ turf::Atomic<ureg> sourceIndex;
+ };
+
+ Map& m_map;
+ Table* m_destination;
+ turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
+ turf::Atomic<bool> m_overflowed;
+ turf::Atomic<sreg> m_unitsRemaining;
+ ureg m_numSources;
+
+ TableMigration(Map& map) : m_map(map) {
+ }
+
+ static TableMigration* create(Map& map, ureg numSources) {
+ TableMigration* migration =
+ (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
+ new (migration) TableMigration(map);
+ migration->m_workerStatus.storeNonatomic(0);
+ migration->m_overflowed.storeNonatomic(false);
+ migration->m_unitsRemaining.storeNonatomic(0);
+ migration->m_numSources = numSources;
+ // Caller is responsible for filling in sources & destination
+ return migration;
+ }
+
+ virtual ~TableMigration() TURF_OVERRIDE {
+ }
+
+ void destroy() {
+ // Destroy all source tables.
+ for (ureg i = 0; i < m_numSources; i++)
+ if (getSources()[i].table)
+ getSources()[i].table->destroy();
+ // Delete the migration object itself.
+ this->TableMigration::~TableMigration();
+ TURF_HEAP.free(this);
+ }
+
+ Source* getSources() const {
+ return (Source*) (this + 1);
+ }
+
+ bool migrateRange(Table* srcTable, ureg startIdx);
+ virtual void run() TURF_OVERRIDE;
+ };
+
+ static Cell* find(Hash hash, Table* table) {
+ TURF_TRACE(Leapfrog, 0, "[find] called", uptr(table), hash);
+ TURF_ASSERT(table);
+ TURF_ASSERT(hash != KeyTraits::NullHash);
+ ureg sizeMask = table->sizeMask;
+ // Optimistically check hashed cell even though it might belong to another bucket
+ ureg idx = hash & sizeMask;
+ CellGroup* group = table->getCellGroups() + (idx >> 2);
+ Cell* cell = group->cells + (idx & 3);
+ Hash probeHash = cell->hash.load(turf::Relaxed);
+ if (probeHash == hash) {
+ TURF_TRACE(Leapfrog, 1, "[find] found existing cell optimistically", uptr(table), idx);
+ return cell;
+ } else if (probeHash == KeyTraits::NullHash) {
+ return cell = NULL;
+ }
+ // Follow probe chain for our bucket
+ u8 delta = group->deltas[idx & 3].load(turf::Relaxed);
+ while (delta) {
+ idx = (idx + delta) & sizeMask;
+ group = table->getCellGroups() + (idx >> 2);
+ cell = group->cells + (idx & 3);
+ Hash probeHash = cell->hash.load(turf::Relaxed);
+ // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert,
+ // but we don't check for it. We just follow the probe chain.
+ if (probeHash == hash) {
+ TURF_TRACE(Leapfrog, 2, "[find] found existing cell", uptr(table), idx);
+ return cell;
+ }
+ delta = group->deltas[(idx & 3) + 4].load(turf::Relaxed);
+ }
+ // End of probe chain, not found
+ return NULL;
+ }
+
+ // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
+ enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
+ static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) {
+ TURF_TRACE(Leapfrog, 3, "[insertOrFind] called", uptr(table), hash);
+ TURF_ASSERT(table);
+ TURF_ASSERT(hash != KeyTraits::NullHash);
+ ureg sizeMask = table->sizeMask;
+ ureg idx = ureg(hash);
+
+ // Check hashed cell first, though it may not even belong to the bucket.
+ CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
+ cell = group->cells + (idx & 3);
+ Hash probeHash = cell->hash.load(turf::Relaxed);
+ if (probeHash == KeyTraits::NullHash) {
+ if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
+ TURF_TRACE(Leapfrog, 4, "[insertOrFind] reserved first cell", uptr(table), idx);
+ // There are no links to set. We're done.
+ return InsertResult_InsertedNew;
+ } else {
+ TURF_TRACE(Leapfrog, 5, "[insertOrFind] race to reserve first cell", uptr(table), idx);
+ // Fall through to check if it was the same hash...
+ }
+ }
+ if (probeHash == hash) {
+ TURF_TRACE(Leapfrog, 6, "[insertOrFind] found in first cell", uptr(table), idx);
+ return InsertResult_AlreadyFound;
+ }
+
+ // Follow the link chain for this bucket.
+ ureg maxIdx = idx + sizeMask;
+ ureg linkLevel = 0;
+ turf::Atomic<u8>* prevLink;
+ for (;;) {
+ followLink:
+ prevLink = group->deltas + ((idx & 3) + linkLevel);
+ linkLevel = 4;
+ u8 probeDelta = prevLink->load(turf::Relaxed);
+ if (probeDelta) {
+ idx += probeDelta;
+ // Check the hash for this cell.
+ group = table->getCellGroups() + ((idx & sizeMask) >> 2);
+ cell = group->cells + (idx & 3);
+ probeHash = cell->hash.load(turf::Relaxed);
+ if (probeHash == KeyTraits::NullHash) {
+ // Cell was linked, but hash is not visible yet.
+ // We could avoid this case (and guarantee it's visible) using acquire & release, but instead,
+ // just poll until it becomes visible.
+ TURF_TRACE(Leapfrog, 7, "[insertOrFind] race to read hash", uptr(table), idx);
+ do {
+ probeHash = cell->hash.load(turf::Acquire);
+ } while (probeHash == KeyTraits::NullHash);
+ }
+ TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
+ if (probeHash == hash) {
+ TURF_TRACE(Leapfrog, 8, "[insertOrFind] found in probe chain", uptr(table), idx);
+ return InsertResult_AlreadyFound;
+ }
+ } else {
+ // Reached the end of the link chain for this bucket.
+ // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket.
+ ureg prevLinkIdx = idx;
+ TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
+ ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit);
+ while (linearProbesRemaining-- > 0) {
+ idx++;
+ group = table->getCellGroups() + ((idx & sizeMask) >> 2);
+ cell = group->cells + (idx & 3);
+ probeHash = cell->hash.load(turf::Relaxed);
+ if (probeHash == KeyTraits::NullHash) {
+ // It's an empty cell. Try to reserve it.
+ if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
+ // Success. We've reserved the cell. Link it to previous cell in same bucket.
+ TURF_TRACE(Leapfrog, 9, "[insertOrFind] reserved cell", uptr(table), idx);
+ TURF_ASSERT(probeDelta == 0);
+ u8 desiredDelta = idx - prevLinkIdx;
+#if TURF_WITH_ASSERTS
+ probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
+ TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
+#else
+ prevLink->store(desiredDelta, turf::Relaxed);
+#endif
+ return InsertResult_InsertedNew;
+ } else {
+ TURF_TRACE(Leapfrog, 10, "[insertOrFind] race to reserve cell", uptr(table), idx);
+ // Fall through to check if it's the same hash...
+ }
+ }
+ Hash x = (probeHash ^ hash);
+ // Check for same hash.
+ if (!x) {
+ TURF_TRACE(Leapfrog, 11, "[insertOrFind] found outside probe chain", uptr(table), idx);
+ return InsertResult_AlreadyFound;
+ }
+ // Check for same bucket.
+ if ((x & sizeMask) == 0) {
+ TURF_TRACE(Leapfrog, 12, "[insertOrFind] found late-arriving cell in same bucket", uptr(table), idx);
+ // Attempt to set the link on behalf of the late-arriving cell.
+ // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here,
+ // there's no guarantee that our own link chain will be well-formed by the time this function returns.
+ // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.)
+ u8 desiredDelta = idx - prevLinkIdx;
+#if TURF_WITH_ASSERTS
+ probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
+ TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
+ if (probeDelta == 0)
+ TURF_TRACE(Leapfrog, 13, "[insertOrFind] set link on behalf of late-arriving cell", uptr(table), idx);
+#else
+ prevLink->store(desiredDelta, turf::Relaxed);
+#endif
+ goto followLink; // Try to follow link chain for the bucket again.
+ }
+ // Continue linear search...
+ }
+ // Table is too full to insert.
+ overflowIdx = idx + 1;
+ TURF_TRACE(Leapfrog, 14, "[insertOrFind] overflow", uptr(table), overflowIdx);
+ return InsertResult_Overflow;
+ }
+ }
+ }
+
+ static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
+ // Create new migration by DCLI.
+ TURF_TRACE(Leapfrog, 15, "[beginTableMigrationToSize] called", 0, 0);
+ SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
+ if (job) {
+ TURF_TRACE(Leapfrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0);
+ } else {
+ turf::LockGuard<turf::Mutex> guard(table->mutex);
+ job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
+ if (job) {
+ TURF_TRACE(Leapfrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
+ } else {
+ // Create new migration.
+ TableMigration* migration = TableMigration::create(map, 1);
+ migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
+ migration->getSources()[0].table = table;
+ migration->getSources()[0].sourceIndex.storeNonatomic(0);
+ migration->m_destination = Table::create(nextTableSize);
+ // Publish the new migration.
+ table->jobCoordinator.storeRelease(migration);
+ }
+ }
+ }
+
+ static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) {
+ // Estimate number of cells in use based on a small sample.
+ ureg sizeMask = table->sizeMask;
+ ureg idx = overflowIdx - CellsInUseSample;
+ ureg inUseCells = 0;
+ for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
+ CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
+ Cell* cell = group->cells + (idx & 3);
+ Value value = cell->value.load(turf::Relaxed);
+ if (value == Value(ValueTraits::Redirect)) {
+ // Another thread kicked off the jobCoordinator. The caller will participate upon return.
+ TURF_TRACE(Leapfrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0);
+ return;
+ }
+ if (value != Value(ValueTraits::NullValue))
+ inUseCells++;
+ idx++;
+ }
+ float inUseRatio = float(inUseCells) / CellsInUseSample;
+ float estimatedInUse = (sizeMask + 1) * inUseRatio;
+#if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS
+ // Periodically underestimate the number of cells in use.
+ // This exercises the code that handles overflow during migration.
+ static ureg counter = 1;
+ if ((++counter & 3) == 0) {
+ estimatedInUse /= 4;
+ }
+#endif
+ ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
+ beginTableMigrationToSize(map, table, nextTableSize);
+ }
+}; // Leapfrog
+
+template <class Map>
+bool Leapfrog<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
+ ureg srcSizeMask = srcTable->sizeMask;
+ ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
+ // Iterate over source range.
+ for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
+ CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2);
+ Cell* srcCell = srcGroup->cells + (srcIdx & 3);
+ Hash srcHash;
+ Value srcValue;
+ // Fetch the srcHash and srcValue.
+ for (;;) {
+ srcHash = srcCell->hash.load(turf::Relaxed);
+ if (srcHash == KeyTraits::NullHash) {
+ // An unused cell. Try to put a Redirect marker in its value.
+ srcValue =
+ srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
+ if (srcValue == Value(ValueTraits::Redirect)) {
+ // srcValue is already marked Redirect due to previous incomplete migration.
+ TURF_TRACE(Leapfrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
+ break;
+ }
+ if (srcValue == Value(ValueTraits::NullValue))
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
+ TURF_TRACE(Leapfrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
+ // Otherwise, somebody just claimed the cell. Read srcHash again...
+ } else {
+ // Check for deleted/uninitialized value.
+ srcValue = srcCell->value.load(turf::Relaxed);
+ if (srcValue == Value(ValueTraits::NullValue)) {
+ // Try to put a Redirect marker.
+ if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
+ break; // Redirect has been placed. Break inner loop, continue outer loop.
+ TURF_TRACE(Leapfrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
+ if (srcValue == Value(ValueTraits::Redirect)) {
+ // FIXME: I don't think this will happen. Investigate & change to assert
+ TURF_TRACE(Leapfrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
+ break;
+ }
+ } else if (srcValue == Value(ValueTraits::Redirect)) {
+ // srcValue is already marked Redirect due to previous incomplete migration.
+ TURF_TRACE(Leapfrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
+ break;
+ }
+
+ // We've got a key/value pair to migrate.
+ // Reserve a destination cell in the destination.
+ TURF_ASSERT(srcHash != KeyTraits::NullHash);
+ TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
+ TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
+ Cell* dstCell;
+ ureg overflowIdx;
+ InsertResult result = insertOrFind(srcHash, m_destination, dstCell, overflowIdx);
+ // During migration, a hash can only exist in one place among all the source tables,
+ // and it is only migrated by one thread. Therefore, the hash will never already exist
+ // in the destination table:
+ TURF_ASSERT(result != InsertResult_AlreadyFound);
+ if (result == InsertResult_Overflow) {
+ // Destination overflow.
+ // This can happen for several reasons. For example, the source table could have
+ // existed of all deleted cells when it overflowed, resulting in a small destination
+ // table size, but then another thread could re-insert all the same hashes
+ // before the migration completed.
+ // Caller will cancel the current migration and begin a new one.
+ return false;
+ }
+ // Migrate the old value to the new cell.
+ for (;;) {
+ // Copy srcValue to the destination.
+ dstCell->value.store(srcValue, turf::Relaxed);
+ // Try to place a Redirect marker in srcValue.
+ Value doubleCheckedSrcValue =
+ srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
+ TURF_ASSERT(doubleCheckedSrcValue !=
+ Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
+ if (doubleCheckedSrcValue == srcValue) {
+ // No racing writes to the src. We've successfully placed the Redirect marker.
+ // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
+ // by a late-arriving erase.
+ if (srcValue == Value(ValueTraits::NullValue))
+ TURF_TRACE(Leapfrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
+ break;
+ }
+ // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
+ TURF_TRACE(Leapfrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
+ srcValue = doubleCheckedSrcValue;
+ }
+ // Cell successfully migrated. Proceed to next source cell.
+ break;
+ }
+ }
+ }
+ // Range has been migrated successfully.
+ return true;
+}
+
+template <class Map>
+void Leapfrog<Map>::TableMigration::run() {
+ // Conditionally increment the shared # of workers.
+ ureg probeStatus = m_workerStatus.load(turf::Relaxed);
+ do {
+ if (probeStatus & 1) {
+ // End flag is already set, so do nothing.
+ TURF_TRACE(Leapfrog, 26, "[TableMigration::run] already ended", uptr(this), 0);
+ return;
+ }
+ } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
+ // # of workers has been incremented, and the end flag is clear.
+ TURF_ASSERT((probeStatus & 1) == 0);
+
+ // Iterate over all source tables.
+ for (ureg s = 0; s < m_numSources; s++) {
+ Source& source = getSources()[s];
+ // Loop over all migration units in this source table.
+ for (;;) {
+ if (m_workerStatus.load(turf::Relaxed) & 1) {
+ TURF_TRACE(Leapfrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0);
+ goto endMigration;
+ }
+ ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
+ if (startIdx >= source.table->sizeMask + 1)
+ break; // No more migration units in this table. Try next source table.
+ bool overflowed = !migrateRange(source.table, startIdx);
+ if (overflowed) {
+ // *** FAILED MIGRATION ***
+ // TableMigration failed due to destination table overflow.
+ // No other thread can declare the migration successful at this point, because *this* unit will never complete,
+ // hence m_unitsRemaining won't reach zero.
+ // However, multiple threads can independently detect a failed migration at the same time.
+ TURF_TRACE(Leapfrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
+ // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
+ // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
+ // the thread
+ // that deals with it.
+ bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
+ if (oldOverflowed)
+ TURF_TRACE(Leapfrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
+ uptr(oldOverflowed));
+ m_workerStatus.fetchOr(1, turf::Relaxed);
+ goto endMigration;
+ }
+ sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
+ TURF_ASSERT(prevRemaining > 0);
+ if (prevRemaining == 1) {
+ // *** SUCCESSFUL MIGRATION ***
+ // That was the last chunk to migrate.
+ m_workerStatus.fetchOr(1, turf::Relaxed);
+ goto endMigration;
+ }
+ }
+ }
+ TURF_TRACE(Leapfrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0);
+
+endMigration:
+ // Decrement the shared # of workers.
+ probeStatus = m_workerStatus.fetchSub(
+ 2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
+ if (probeStatus >= 4) {
+ // There are other workers remaining. Return here so that only the very last worker will proceed.
+ TURF_TRACE(Leapfrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
+ return;
+ }
+
+ // We're the very last worker thread.
+ // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
+ TURF_ASSERT(probeStatus == 3);
+ bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
+ if (!overflowed) {
+ // The migration succeeded. This is the most likely outcome. Publish the new subtree.
+ m_map.publishTableMigration(this);
+ // End the jobCoodinator.
+ getSources()[0].table->jobCoordinator.end();
+ } else {
+ // The migration failed due to the overflow of the destination table.
+ Table* origTable = getSources()[0].table;
+ turf::LockGuard<turf::Mutex> guard(origTable->mutex);
+ SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
+ if (checkedJob != this) {
+ TURF_TRACE(Leapfrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+ uptr(checkedJob));
+ } else {
+ TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
+ // Double the destination table size.
+ migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
+ // Transfer source tables to the new migration.
+ for (ureg i = 0; i < m_numSources; i++) {
+ migration->getSources()[i].table = getSources()[i].table;
+ getSources()[i].table = NULL;
+ migration->getSources()[i].sourceIndex.storeNonatomic(0);
+ }
+ migration->getSources()[m_numSources].table = m_destination;
+ migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
+ // Calculate total number of migration units to move.
+ ureg unitsRemaining = 0;
+ for (ureg s = 0; s < migration->m_numSources; s++)
+ unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
+ migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
+ // Publish the new migration.
+ origTable->jobCoordinator.storeRelease(migration);
+ }
+ }
+
+ // We're done with this TableMigration. Queue it for GC.
+ DefaultQSBR.enqueue(&TableMigration::destroy, this);
+}
+
+} // namespace details
+} // namespace junction
+
+#endif // JUNCTION_DETAILS_LEAPFROG_H
+++ /dev/null
-/*------------------------------------------------------------------------
- Junction: Concurrent data structures in C++
- Copyright (c) 2016 Jeff Preshing
-
- Distributed under the Simplified BSD License.
- Original location: https://github.com/preshing/junction
-
- This software is distributed WITHOUT ANY WARRANTY; without even the
- implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
- See the LICENSE file for more information.
-------------------------------------------------------------------------*/
-
-#ifndef JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
-#define JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
-
-#include <junction/Core.h>
-#include <junction/QSBR.h>
-#include <junction/ConcurrentMap_LeapFrog.h>
-#include <turf/Util.h>
-
-namespace junction {
-namespace extra {
-
-class MapAdapter {
-public:
- static TURF_CONSTEXPR const char* MapName = "Junction LeapFrog map";
-
- MapAdapter(ureg) {
- }
-
- class ThreadContext {
- private:
- QSBR::Context m_qsbrContext;
-
- public:
- ThreadContext(MapAdapter&, ureg) {
- }
-
- void registerThread() {
- m_qsbrContext = DefaultQSBR.createContext();
- }
-
- void unregisterThread() {
- DefaultQSBR.destroyContext(m_qsbrContext);
- }
-
- void update() {
- DefaultQSBR.update(m_qsbrContext);
- }
- };
-
- typedef ConcurrentMap_LeapFrog<u32, void*> Map;
-
- static ureg getInitialCapacity(ureg maxPopulation) {
- return turf::util::roundUpPowerOf2(maxPopulation / 4);
- }
-};
-
-} // namespace extra
-} // namespace junction
-
-#endif // JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
--- /dev/null
+/*------------------------------------------------------------------------
+ Junction: Concurrent data structures in C++
+ Copyright (c) 2016 Jeff Preshing
+
+ Distributed under the Simplified BSD License.
+ Original location: https://github.com/preshing/junction
+
+ This software is distributed WITHOUT ANY WARRANTY; without even the
+ implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+ See the LICENSE file for more information.
+------------------------------------------------------------------------*/
+
+#ifndef JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
+#define JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
+
+#include <junction/Core.h>
+#include <junction/QSBR.h>
+#include <junction/ConcurrentMap_Leapfrog.h>
+#include <turf/Util.h>
+
+namespace junction {
+namespace extra {
+
+class MapAdapter {
+public:
+ static TURF_CONSTEXPR const char* MapName = "Junction Leapfrog map";
+
+ MapAdapter(ureg) {
+ }
+
+ class ThreadContext {
+ private:
+ QSBR::Context m_qsbrContext;
+
+ public:
+ ThreadContext(MapAdapter&, ureg) {
+ }
+
+ void registerThread() {
+ m_qsbrContext = DefaultQSBR.createContext();
+ }
+
+ void unregisterThread() {
+ DefaultQSBR.destroyContext(m_qsbrContext);
+ }
+
+ void update() {
+ DefaultQSBR.update(m_qsbrContext);
+ }
+ };
+
+ typedef ConcurrentMap_Leapfrog<u32, void*> Map;
+
+ static ureg getInitialCapacity(ureg maxPopulation) {
+ return turf::util::roundUpPowerOf2(maxPopulation / 4);
+ }
+};
+
+} // namespace extra
+} // namespace junction
+
+#endif // JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
ALL_MAPS = [
('michael', 'junction/extra/impl/MapAdapter_CDS_Michael.h', ['-DJUNCTION_WITH_CDS=1', '-DTURF_WITH_EXCEPTIONS=1']),
('linear', 'junction/extra/impl/MapAdapter_Linear.h', []),
- ('leapfrog', 'junction/extra/impl/MapAdapter_LeapFrog.h', []),
+ ('leapfrog', 'junction/extra/impl/MapAdapter_Leapfrog.h', []),
('grampa', 'junction/extra/impl/MapAdapter_Grampa.h', []),
('stdmap', 'junction/extra/impl/MapAdapter_StdMap.h', []),
('folly', 'junction/extra/impl/MapAdapter_Folly.h', ['-DJUNCTION_WITH_FOLLY=1', '-DTURF_WITH_EXCEPTIONS=1']),
('null', 'junction/extra/impl/MapAdapter_Null.h', [], ['-i256', '-c10']),
('michael', 'junction/extra/impl/MapAdapter_CDS_Michael.h', ['-DJUNCTION_WITH_CDS=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i256', '-c10']),
('linear', 'junction/extra/impl/MapAdapter_Linear.h', [], ['-i256', '-c10']),
- ('leapfrog', 'junction/extra/impl/MapAdapter_LeapFrog.h', [], ['-i256', '-c10']),
+ ('leapfrog', 'junction/extra/impl/MapAdapter_Leapfrog.h', [], ['-i256', '-c10']),
('grampa', 'junction/extra/impl/MapAdapter_Grampa.h', [], ['-i256', '-c10']),
('stdmap', 'junction/extra/impl/MapAdapter_StdMap.h', [], ['-i256', '-c10']),
('folly', 'junction/extra/impl/MapAdapter_Folly.h', ['-DJUNCTION_WITH_FOLLY=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i256', '-c1']),
ALL_MAPS = [
('michael', 'junction/extra/impl/MapAdapter_CDS_Michael.h', ['-DJUNCTION_WITH_CDS=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i10000', '-c200']),
('linear', 'junction/extra/impl/MapAdapter_Linear.h', [], ['-i10000', '-c200']),
- ('leapfrog', 'junction/extra/impl/MapAdapter_LeapFrog.h', [], ['-i10000', '-c200']),
+ ('leapfrog', 'junction/extra/impl/MapAdapter_Leapfrog.h', [], ['-i10000', '-c200']),
('grampa', 'junction/extra/impl/MapAdapter_Grampa.h', [], ['-i10000', '-c200']),
('stdmap', 'junction/extra/impl/MapAdapter_StdMap.h', [], ['-i10000', '-c10']),
('folly', 'junction/extra/impl/MapAdapter_Folly.h', ['-DJUNCTION_WITH_FOLLY=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i2000', '-c1']),