Rename LeapFrog to Leapfrog
authorJeff Preshing <filter-github@preshing.com>
Mon, 29 Feb 2016 13:59:57 +0000 (08:59 -0500)
committerJeff Preshing <filter-github@preshing.com>
Mon, 29 Feb 2016 13:59:57 +0000 (08:59 -0500)
14 files changed:
README.md
junction/ConcurrentMap_LeapFrog.cpp [deleted file]
junction/ConcurrentMap_LeapFrog.h [deleted file]
junction/ConcurrentMap_Leapfrog.cpp [new file with mode: 0644]
junction/ConcurrentMap_Leapfrog.h [new file with mode: 0644]
junction/details/LeapFrog.cpp [deleted file]
junction/details/LeapFrog.h [deleted file]
junction/details/Leapfrog.cpp [new file with mode: 0644]
junction/details/Leapfrog.h [new file with mode: 0644]
junction/extra/impl/MapAdapter_LeapFrog.h [deleted file]
junction/extra/impl/MapAdapter_Leapfrog.h [new file with mode: 0644]
samples/MapMemoryBench/TestAllMaps.py
samples/MapPerformanceTests/TestAllMaps.py
samples/MapScalabilityTests/TestAllMaps.py

index 96d4dd23b417903e73895709327efb88b4051f20..b4361a0d931b6abce227b452b7a1270c8fbc12d3 100644 (file)
--- a/README.md
+++ b/README.md
@@ -2,7 +2,7 @@ Junction is a library of concurrent data structures in C++. It contains several
 
     junction::ConcurrentMap_Crude
     junction::ConcurrentMap_Linear
-    junction::ConcurrentMap_LeapFrog
+    junction::ConcurrentMap_Leapfrog
     junction::ConcurrentMap_Grampa
 
 [CMake](https://cmake.org/) and [Turf](https://github.com/preshing/turf) are required. See the blog post [New Concurrent Hash Maps for C++](http://preshing.com/20160201/new-concurrent-hash-maps-for-cpp/) for more information.
@@ -103,7 +103,7 @@ A Junction map is a lot like a big array of `std::atomic<>` variables, where the
 
 * All of a Junction map's member functions, together with its `Mutator` member functions, are atomic with respect to each other, so you can safely call them from any thread without mutual exclusion.
 * If an `set` [happens before](http://preshing.com/20130702/the-happens-before-relation/) a `get` with the same key, the `get` will return the value set, except if another operation changes the value in between. Any [synchronizing operation](http://preshing.com/20130823/the-synchronizes-with-relation/) will establish this relationship.
-* For Linear, LeapFrog and Grampa maps, `set` is a [release](http://preshing.com/20120913/acquire-and-release-semantics/) operation and `get` is a [consume](http://preshing.com/20140709/the-purpose-of-memory_order_consume-in-cpp11/) operation, so you can safely pass non-atomic information between threads using a pointer. For Crude maps, all operations are relaxed.
+* For Linear, Leapfrog and Grampa maps, `set` is a [release](http://preshing.com/20120913/acquire-and-release-semantics/) operation and `get` is a [consume](http://preshing.com/20140709/the-purpose-of-memory_order_consume-in-cpp11/) operation, so you can safely pass non-atomic information between threads using a pointer. For Crude maps, all operations are relaxed.
 * In the current version, you must not set while concurrently using an `Iterator`.
 
 ## Feedback
diff --git a/junction/ConcurrentMap_LeapFrog.cpp b/junction/ConcurrentMap_LeapFrog.cpp
deleted file mode 100644 (file)
index 67a39ad..0000000
+++ /dev/null
@@ -1,37 +0,0 @@
-/*------------------------------------------------------------------------
-  Junction: Concurrent data structures in C++
-  Copyright (c) 2016 Jeff Preshing
-
-  Distributed under the Simplified BSD License.
-  Original location: https://github.com/preshing/junction
-
-  This software is distributed WITHOUT ANY WARRANTY; without even the
-  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
-  See the LICENSE file for more information.
-------------------------------------------------------------------------*/
-
-#include <junction/ConcurrentMap_LeapFrog.h>
-
-namespace junction {
-
-TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_LeapFrog, 17) // autogenerated by TidySource.py
-TURF_TRACE_DEFINE("[Mutator] find constructor called")
-TURF_TRACE_DEFINE("[Mutator] find was redirected")
-TURF_TRACE_DEFINE("[Mutator] insertOrFind constructor called")
-TURF_TRACE_DEFINE("[Mutator] insertOrFind was redirected")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] called")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] exchanged Value")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] detected race to write value")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] racing write inserted new value")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] was redirected")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] was re-redirected")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] overflow after redirect")
-TURF_TRACE_DEFINE("[Mutator::eraseValue] called")
-TURF_TRACE_DEFINE("[Mutator::eraseValue] detected race to write value")
-TURF_TRACE_DEFINE("[Mutator::eraseValue] was redirected")
-TURF_TRACE_DEFINE("[Mutator::eraseValue] was re-redirected")
-TURF_TRACE_DEFINE("[get] called")
-TURF_TRACE_DEFINE("[get] was redirected")
-TURF_TRACE_DEFINE_END(ConcurrentMap_LeapFrog, 17)
-
-} // namespace junction
diff --git a/junction/ConcurrentMap_LeapFrog.h b/junction/ConcurrentMap_LeapFrog.h
deleted file mode 100644 (file)
index 2d0c0e2..0000000
+++ /dev/null
@@ -1,343 +0,0 @@
-/*------------------------------------------------------------------------
-  Junction: Concurrent data structures in C++
-  Copyright (c) 2016 Jeff Preshing
-
-  Distributed under the Simplified BSD License.
-  Original location: https://github.com/preshing/junction
-
-  This software is distributed WITHOUT ANY WARRANTY; without even the
-  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
-  See the LICENSE file for more information.
-------------------------------------------------------------------------*/
-
-#ifndef JUNCTION_CONCURRENTMAP_LEAPFROG_H
-#define JUNCTION_CONCURRENTMAP_LEAPFROG_H
-
-#include <junction/Core.h>
-#include <junction/details/LeapFrog.h>
-#include <junction/QSBR.h>
-#include <turf/Heap.h>
-#include <turf/Trace.h>
-
-namespace junction {
-
-TURF_TRACE_DECLARE(ConcurrentMap_LeapFrog, 17)
-
-template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V> >
-class ConcurrentMap_LeapFrog {
-public:
-    typedef K Key;
-    typedef V Value;
-    typedef KT KeyTraits;
-    typedef VT ValueTraits;
-    typedef typename turf::util::BestFit<Key>::Unsigned Hash;
-    typedef details::LeapFrog<ConcurrentMap_LeapFrog> Details;
-
-private:
-    turf::Atomic<typename Details::Table*> m_root;
-
-public:
-    ConcurrentMap_LeapFrog(ureg capacity = Details::InitialSize) : m_root(Details::Table::create(capacity)) {
-    }
-
-    ~ConcurrentMap_LeapFrog() {
-        typename Details::Table* table = m_root.loadNonatomic();
-        table->destroy();
-    }
-
-    // publishTableMigration() is called by exactly one thread from Details::TableMigration::run()
-    // after all the threads participating in the migration have completed their work.
-    void publishTableMigration(typename Details::TableMigration* migration) {
-        // There are no racing calls to this function.
-        typename Details::Table* oldRoot = m_root.loadNonatomic();
-        m_root.store(migration->m_destination, turf::Release);
-        TURF_ASSERT(oldRoot == migration->getSources()[0].table);
-        // Caller will GC the TableMigration and the source table.
-    }
-
-    // A Mutator represents a known cell in the hash table.
-    // It's meant for manipulations within a temporary function scope.
-    // Obviously you must not call QSBR::Update while holding a Mutator.
-    // Any operation that modifies the table (exchangeValue, eraseValue)
-    // may be forced to follow a redirected cell, which changes the Mutator itself.
-    // Note that even if the Mutator was constructed from an existing cell,
-    // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted,
-    // or if another thread deletes the key between the two steps.
-    class Mutator {
-    private:
-        friend class ConcurrentMap_LeapFrog;
-
-        ConcurrentMap_LeapFrog& m_map;
-        typename Details::Table* m_table;
-        typename Details::Cell* m_cell;
-        Value m_value;
-
-        // Constructor: Find existing cell
-        Mutator(ConcurrentMap_LeapFrog& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
-            TURF_TRACE(ConcurrentMap_LeapFrog, 0, "[Mutator] find constructor called", uptr(0), uptr(key));
-            Hash hash = KeyTraits::hash(key);
-            for (;;) {
-                m_table = m_map.m_root.load(turf::Consume);
-                m_cell = Details::find(hash, m_table);
-                if (!m_cell)
-                    return;
-                m_value = m_cell->value.load(turf::Consume);
-                if (m_value != Value(ValueTraits::Redirect))
-                    return; // Found an existing value
-                // We've encountered a Redirect value. Help finish the migration.
-                TURF_TRACE(ConcurrentMap_LeapFrog, 1, "[Mutator] find was redirected", uptr(m_table), 0);
-                m_table->jobCoordinator.participate();
-                // Try again using the latest root.
-            }
-        }
-
-        // Constructor: Insert or find cell
-        Mutator(ConcurrentMap_LeapFrog& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
-            TURF_TRACE(ConcurrentMap_LeapFrog, 2, "[Mutator] insertOrFind constructor called", uptr(0), uptr(key));
-            Hash hash = KeyTraits::hash(key);
-            for (;;) {
-                m_table = m_map.m_root.load(turf::Consume);
-                ureg overflowIdx;
-                switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
-                case Details::InsertResult_InsertedNew: {
-                    // We've inserted a new cell. Don't load m_cell->value.
-                    return;
-                }
-                case Details::InsertResult_AlreadyFound: {
-                    // The hash was already found in the table.
-                    m_value = m_cell->value.load(turf::Consume);
-                    if (m_value == Value(ValueTraits::Redirect)) {
-                        // We've encountered a Redirect value.
-                        TURF_TRACE(ConcurrentMap_LeapFrog, 3, "[Mutator] insertOrFind was redirected", uptr(m_table), uptr(m_value));
-                        break; // Help finish the migration.
-                    }
-                    return; // Found an existing value
-                }
-                case Details::InsertResult_Overflow: {
-                    Details::beginTableMigration(m_map, m_table, overflowIdx);
-                    break;
-                }
-                }
-                // A migration has been started (either by us, or another thread). Participate until it's complete.
-                m_table->jobCoordinator.participate();
-                // Try again using the latest root.
-            }
-        }
-
-    public:
-        Value getValue() const {
-            // Return previously loaded value. Don't load it again.
-            return Value(m_value);
-        }
-
-        Value exchangeValue(Value desired) {
-            TURF_ASSERT(desired != Value(ValueTraits::NullValue));
-            TURF_ASSERT(desired != Value(ValueTraits::Redirect));
-            TURF_ASSERT(m_cell); // Cell must have been found or inserted
-            TURF_TRACE(ConcurrentMap_LeapFrog, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
-            for (;;) {
-                Value oldValue = m_value;
-                if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
-                    // Exchange was successful. Return previous value.
-                    TURF_TRACE(ConcurrentMap_LeapFrog, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value),
-                               uptr(desired));
-                    Value result = m_value;
-                    m_value = desired; // Leave the mutator in a valid state
-                    return result;
-                }
-                // The CAS failed and m_value has been updated with the latest value.
-                if (m_value != Value(ValueTraits::Redirect)) {
-                    TURF_TRACE(ConcurrentMap_LeapFrog, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
-                               uptr(m_value));
-                    if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
-                        TURF_TRACE(ConcurrentMap_LeapFrog, 7, "[Mutator::exchangeValue] racing write inserted new value",
-                                   uptr(m_table), uptr(m_value));
-                    }
-                    // There was a racing write (or erase) to this cell.
-                    // Pretend we exchanged with ourselves, and just let the racing write win.
-                    return desired;
-                }
-                // We've encountered a Redirect value. Help finish the migration.
-                TURF_TRACE(ConcurrentMap_LeapFrog, 8, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
-                Hash hash = m_cell->hash.load(turf::Relaxed);
-                for (;;) {
-                    // Help complete the migration.
-                    m_table->jobCoordinator.participate();
-                    // Try again in the new table.
-                    m_table = m_map.m_root.load(turf::Consume);
-                    m_value = Value(ValueTraits::NullValue);
-                    ureg overflowIdx;
-                    switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
-                    case Details::InsertResult_AlreadyFound:
-                        m_value = m_cell->value.load(turf::Consume);
-                        if (m_value == Value(ValueTraits::Redirect)) {
-                            TURF_TRACE(ConcurrentMap_LeapFrog, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
-                                       uptr(m_value));
-                            break;
-                        }
-                        goto breakOuter;
-                    case Details::InsertResult_InsertedNew:
-                        goto breakOuter;
-                    case Details::InsertResult_Overflow:
-                        TURF_TRACE(ConcurrentMap_LeapFrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
-                                   overflowIdx);
-                        Details::beginTableMigration(m_map, m_table, overflowIdx);
-                        break;
-                    }
-                    // We were redirected... again
-                }
-            breakOuter:;
-                // Try again in the new table.
-            }
-        }
-
-        void setValue(Value desired) {
-            exchangeValue(desired);
-        }
-
-        Value eraseValue() {
-            TURF_ASSERT(m_cell); // Cell must have been found or inserted
-            TURF_TRACE(ConcurrentMap_LeapFrog, 11, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_cell));
-            for (;;) {
-                if (m_value == Value(ValueTraits::NullValue))
-                    return Value(m_value);
-                TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
-                if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
-                    // Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
-                    TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
-                    Value result = m_value;
-                    m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
-                    return result;
-                }
-                // The CAS failed and m_value has been updated with the latest value.
-                TURF_TRACE(ConcurrentMap_LeapFrog, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
-                           uptr(m_cell));
-                if (m_value != Value(ValueTraits::Redirect)) {
-                    // There was a racing write (or erase) to this cell.
-                    // Pretend we erased nothing, and just let the racing write win.
-                    return Value(ValueTraits::NullValue);
-                }
-                // We've been redirected to a new table.
-                TURF_TRACE(ConcurrentMap_LeapFrog, 13, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell));
-                Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
-                for (;;) {
-                    // Help complete the migration.
-                    m_table->jobCoordinator.participate();
-                    // Try again in the new table.
-                    m_table = m_map.m_root.load(turf::Consume);
-                    m_cell = Details::find(hash, m_table);
-                    if (!m_cell) {
-                        m_value = Value(ValueTraits::NullValue);
-                        return m_value;
-                    }
-                    m_value = m_cell->value.load(turf::Relaxed);
-                    if (m_value != Value(ValueTraits::Redirect))
-                        break;
-                    TURF_TRACE(ConcurrentMap_LeapFrog, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
-                               uptr(m_cell));
-                }
-            }
-        }
-    };
-
-    Mutator insertOrFind(Key key) {
-        return Mutator(*this, key);
-    }
-
-    Mutator find(Key key) {
-        return Mutator(*this, key, false);
-    }
-
-    // Lookup without creating a temporary Mutator.
-    Value get(Key key) {
-        Hash hash = KeyTraits::hash(key);
-        TURF_TRACE(ConcurrentMap_LeapFrog, 15, "[get] called", uptr(this), uptr(hash));
-        for (;;) {
-            typename Details::Table* table = m_root.load(turf::Consume);
-            typename Details::Cell* cell = Details::find(hash, table);
-            if (!cell)
-                return Value(ValueTraits::NullValue);
-            Value value = cell->value.load(turf::Consume);
-            if (value != Value(ValueTraits::Redirect))
-                return value; // Found an existing value
-            // We've been redirected to a new table. Help with the migration.
-            TURF_TRACE(ConcurrentMap_LeapFrog, 16, "[get] was redirected", uptr(table), uptr(hash));
-            table->jobCoordinator.participate();
-            // Try again in the new table.
-        }
-    }
-
-    Value set(Key key, Value desired) {
-        Mutator iter(*this, key);
-        return iter.exchangeValue(desired);
-    }
-
-    Value exchange(Key key, Value desired) {
-        Mutator iter(*this, key);
-        return iter.exchangeValue(desired);
-    }
-
-    Value erase(Key key) {
-        Mutator iter(*this, key, false);
-        return iter.eraseValue();
-    }
-
-    // The easiest way to implement an Iterator is to prevent all Redirects.
-    // The currrent Iterator does that by forbidding concurrent inserts.
-    // To make it work with concurrent inserts, we'd need a way to block TableMigrations.
-    class Iterator {
-    private:
-        typename Details::Table* m_table;
-        ureg m_idx;
-        Key m_hash;
-        Value m_value;
-
-    public:
-        Iterator(ConcurrentMap_LeapFrog& map) {
-            // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
-            m_table = map.m_root.load(turf::Consume);
-            m_idx = -1;
-            next();
-        }
-
-        void next() {
-            TURF_ASSERT(m_table);
-            TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
-            while (++m_idx <= m_table->sizeMask) {
-                // Index still inside range of table.
-                typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2);
-                typename Details::Cell* cell = group->cells + (m_idx & 3);
-                m_hash = cell->hash.load(turf::Relaxed);
-                if (m_hash != KeyTraits::NullHash) {
-                    // Cell has been reserved.
-                    m_value = cell->value.load(turf::Relaxed);
-                    TURF_ASSERT(m_value != Value(ValueTraits::Redirect));
-                    if (m_value != Value(ValueTraits::NullValue))
-                        return; // Yield this cell.
-                }
-            }
-            // That's the end of the map.
-            m_hash = KeyTraits::NullHash;
-            m_value = Value(ValueTraits::NullValue);
-        }
-
-        bool isValid() const {
-            return m_value != Value(ValueTraits::NullValue);
-        }
-
-        Key getKey() const {
-            TURF_ASSERT(isValid());
-            // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
-            return KeyTraits::dehash(m_hash);
-        }
-
-        Value getValue() const {
-            TURF_ASSERT(isValid());
-            return m_value;
-        }
-    };
-};
-
-} // namespace junction
-
-#endif // JUNCTION_CONCURRENTMAP_LEAPFROG_H
diff --git a/junction/ConcurrentMap_Leapfrog.cpp b/junction/ConcurrentMap_Leapfrog.cpp
new file mode 100644 (file)
index 0000000..e4aa4b7
--- /dev/null
@@ -0,0 +1,37 @@
+/*------------------------------------------------------------------------
+  Junction: Concurrent data structures in C++
+  Copyright (c) 2016 Jeff Preshing
+
+  Distributed under the Simplified BSD License.
+  Original location: https://github.com/preshing/junction
+
+  This software is distributed WITHOUT ANY WARRANTY; without even the
+  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+  See the LICENSE file for more information.
+------------------------------------------------------------------------*/
+
+#include <junction/ConcurrentMap_Leapfrog.h>
+
+namespace junction {
+
+TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_Leapfrog, 17) // autogenerated by TidySource.py
+TURF_TRACE_DEFINE("[Mutator] find constructor called")
+TURF_TRACE_DEFINE("[Mutator] find was redirected")
+TURF_TRACE_DEFINE("[Mutator] insertOrFind constructor called")
+TURF_TRACE_DEFINE("[Mutator] insertOrFind was redirected")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] called")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] exchanged Value")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] detected race to write value")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] racing write inserted new value")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] was redirected")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] was re-redirected")
+TURF_TRACE_DEFINE("[Mutator::exchangeValue] overflow after redirect")
+TURF_TRACE_DEFINE("[Mutator::eraseValue] called")
+TURF_TRACE_DEFINE("[Mutator::eraseValue] detected race to write value")
+TURF_TRACE_DEFINE("[Mutator::eraseValue] was redirected")
+TURF_TRACE_DEFINE("[Mutator::eraseValue] was re-redirected")
+TURF_TRACE_DEFINE("[get] called")
+TURF_TRACE_DEFINE("[get] was redirected")
+TURF_TRACE_DEFINE_END(ConcurrentMap_Leapfrog, 17)
+
+} // namespace junction
diff --git a/junction/ConcurrentMap_Leapfrog.h b/junction/ConcurrentMap_Leapfrog.h
new file mode 100644 (file)
index 0000000..89ba120
--- /dev/null
@@ -0,0 +1,343 @@
+/*------------------------------------------------------------------------
+  Junction: Concurrent data structures in C++
+  Copyright (c) 2016 Jeff Preshing
+
+  Distributed under the Simplified BSD License.
+  Original location: https://github.com/preshing/junction
+
+  This software is distributed WITHOUT ANY WARRANTY; without even the
+  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+  See the LICENSE file for more information.
+------------------------------------------------------------------------*/
+
+#ifndef JUNCTION_CONCURRENTMAP_LEAPFROG_H
+#define JUNCTION_CONCURRENTMAP_LEAPFROG_H
+
+#include <junction/Core.h>
+#include <junction/details/Leapfrog.h>
+#include <junction/QSBR.h>
+#include <turf/Heap.h>
+#include <turf/Trace.h>
+
+namespace junction {
+
+TURF_TRACE_DECLARE(ConcurrentMap_Leapfrog, 17)
+
+template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V> >
+class ConcurrentMap_Leapfrog {
+public:
+    typedef K Key;
+    typedef V Value;
+    typedef KT KeyTraits;
+    typedef VT ValueTraits;
+    typedef typename turf::util::BestFit<Key>::Unsigned Hash;
+    typedef details::Leapfrog<ConcurrentMap_Leapfrog> Details;
+
+private:
+    turf::Atomic<typename Details::Table*> m_root;
+
+public:
+    ConcurrentMap_Leapfrog(ureg capacity = Details::InitialSize) : m_root(Details::Table::create(capacity)) {
+    }
+
+    ~ConcurrentMap_Leapfrog() {
+        typename Details::Table* table = m_root.loadNonatomic();
+        table->destroy();
+    }
+
+    // publishTableMigration() is called by exactly one thread from Details::TableMigration::run()
+    // after all the threads participating in the migration have completed their work.
+    void publishTableMigration(typename Details::TableMigration* migration) {
+        // There are no racing calls to this function.
+        typename Details::Table* oldRoot = m_root.loadNonatomic();
+        m_root.store(migration->m_destination, turf::Release);
+        TURF_ASSERT(oldRoot == migration->getSources()[0].table);
+        // Caller will GC the TableMigration and the source table.
+    }
+
+    // A Mutator represents a known cell in the hash table.
+    // It's meant for manipulations within a temporary function scope.
+    // Obviously you must not call QSBR::Update while holding a Mutator.
+    // Any operation that modifies the table (exchangeValue, eraseValue)
+    // may be forced to follow a redirected cell, which changes the Mutator itself.
+    // Note that even if the Mutator was constructed from an existing cell,
+    // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted,
+    // or if another thread deletes the key between the two steps.
+    class Mutator {
+    private:
+        friend class ConcurrentMap_Leapfrog;
+
+        ConcurrentMap_Leapfrog& m_map;
+        typename Details::Table* m_table;
+        typename Details::Cell* m_cell;
+        Value m_value;
+
+        // Constructor: Find existing cell
+        Mutator(ConcurrentMap_Leapfrog& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
+            TURF_TRACE(ConcurrentMap_Leapfrog, 0, "[Mutator] find constructor called", uptr(0), uptr(key));
+            Hash hash = KeyTraits::hash(key);
+            for (;;) {
+                m_table = m_map.m_root.load(turf::Consume);
+                m_cell = Details::find(hash, m_table);
+                if (!m_cell)
+                    return;
+                m_value = m_cell->value.load(turf::Consume);
+                if (m_value != Value(ValueTraits::Redirect))
+                    return; // Found an existing value
+                // We've encountered a Redirect value. Help finish the migration.
+                TURF_TRACE(ConcurrentMap_Leapfrog, 1, "[Mutator] find was redirected", uptr(m_table), 0);
+                m_table->jobCoordinator.participate();
+                // Try again using the latest root.
+            }
+        }
+
+        // Constructor: Insert or find cell
+        Mutator(ConcurrentMap_Leapfrog& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
+            TURF_TRACE(ConcurrentMap_Leapfrog, 2, "[Mutator] insertOrFind constructor called", uptr(0), uptr(key));
+            Hash hash = KeyTraits::hash(key);
+            for (;;) {
+                m_table = m_map.m_root.load(turf::Consume);
+                ureg overflowIdx;
+                switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
+                case Details::InsertResult_InsertedNew: {
+                    // We've inserted a new cell. Don't load m_cell->value.
+                    return;
+                }
+                case Details::InsertResult_AlreadyFound: {
+                    // The hash was already found in the table.
+                    m_value = m_cell->value.load(turf::Consume);
+                    if (m_value == Value(ValueTraits::Redirect)) {
+                        // We've encountered a Redirect value.
+                        TURF_TRACE(ConcurrentMap_Leapfrog, 3, "[Mutator] insertOrFind was redirected", uptr(m_table), uptr(m_value));
+                        break; // Help finish the migration.
+                    }
+                    return; // Found an existing value
+                }
+                case Details::InsertResult_Overflow: {
+                    Details::beginTableMigration(m_map, m_table, overflowIdx);
+                    break;
+                }
+                }
+                // A migration has been started (either by us, or another thread). Participate until it's complete.
+                m_table->jobCoordinator.participate();
+                // Try again using the latest root.
+            }
+        }
+
+    public:
+        Value getValue() const {
+            // Return previously loaded value. Don't load it again.
+            return Value(m_value);
+        }
+
+        Value exchangeValue(Value desired) {
+            TURF_ASSERT(desired != Value(ValueTraits::NullValue));
+            TURF_ASSERT(desired != Value(ValueTraits::Redirect));
+            TURF_ASSERT(m_cell); // Cell must have been found or inserted
+            TURF_TRACE(ConcurrentMap_Leapfrog, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
+            for (;;) {
+                Value oldValue = m_value;
+                if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
+                    // Exchange was successful. Return previous value.
+                    TURF_TRACE(ConcurrentMap_Leapfrog, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value),
+                               uptr(desired));
+                    Value result = m_value;
+                    m_value = desired; // Leave the mutator in a valid state
+                    return result;
+                }
+                // The CAS failed and m_value has been updated with the latest value.
+                if (m_value != Value(ValueTraits::Redirect)) {
+                    TURF_TRACE(ConcurrentMap_Leapfrog, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
+                               uptr(m_value));
+                    if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
+                        TURF_TRACE(ConcurrentMap_Leapfrog, 7, "[Mutator::exchangeValue] racing write inserted new value",
+                                   uptr(m_table), uptr(m_value));
+                    }
+                    // There was a racing write (or erase) to this cell.
+                    // Pretend we exchanged with ourselves, and just let the racing write win.
+                    return desired;
+                }
+                // We've encountered a Redirect value. Help finish the migration.
+                TURF_TRACE(ConcurrentMap_Leapfrog, 8, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
+                Hash hash = m_cell->hash.load(turf::Relaxed);
+                for (;;) {
+                    // Help complete the migration.
+                    m_table->jobCoordinator.participate();
+                    // Try again in the new table.
+                    m_table = m_map.m_root.load(turf::Consume);
+                    m_value = Value(ValueTraits::NullValue);
+                    ureg overflowIdx;
+                    switch (Details::insertOrFind(hash, m_table, m_cell, overflowIdx)) { // Modifies m_cell
+                    case Details::InsertResult_AlreadyFound:
+                        m_value = m_cell->value.load(turf::Consume);
+                        if (m_value == Value(ValueTraits::Redirect)) {
+                            TURF_TRACE(ConcurrentMap_Leapfrog, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
+                                       uptr(m_value));
+                            break;
+                        }
+                        goto breakOuter;
+                    case Details::InsertResult_InsertedNew:
+                        goto breakOuter;
+                    case Details::InsertResult_Overflow:
+                        TURF_TRACE(ConcurrentMap_Leapfrog, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
+                                   overflowIdx);
+                        Details::beginTableMigration(m_map, m_table, overflowIdx);
+                        break;
+                    }
+                    // We were redirected... again
+                }
+            breakOuter:;
+                // Try again in the new table.
+            }
+        }
+
+        void setValue(Value desired) {
+            exchangeValue(desired);
+        }
+
+        Value eraseValue() {
+            TURF_ASSERT(m_cell); // Cell must have been found or inserted
+            TURF_TRACE(ConcurrentMap_Leapfrog, 11, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_cell));
+            for (;;) {
+                if (m_value == Value(ValueTraits::NullValue))
+                    return Value(m_value);
+                TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
+                if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
+                    // Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
+                    TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
+                    Value result = m_value;
+                    m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
+                    return result;
+                }
+                // The CAS failed and m_value has been updated with the latest value.
+                TURF_TRACE(ConcurrentMap_Leapfrog, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
+                           uptr(m_cell));
+                if (m_value != Value(ValueTraits::Redirect)) {
+                    // There was a racing write (or erase) to this cell.
+                    // Pretend we erased nothing, and just let the racing write win.
+                    return Value(ValueTraits::NullValue);
+                }
+                // We've been redirected to a new table.
+                TURF_TRACE(ConcurrentMap_Leapfrog, 13, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell));
+                Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
+                for (;;) {
+                    // Help complete the migration.
+                    m_table->jobCoordinator.participate();
+                    // Try again in the new table.
+                    m_table = m_map.m_root.load(turf::Consume);
+                    m_cell = Details::find(hash, m_table);
+                    if (!m_cell) {
+                        m_value = Value(ValueTraits::NullValue);
+                        return m_value;
+                    }
+                    m_value = m_cell->value.load(turf::Relaxed);
+                    if (m_value != Value(ValueTraits::Redirect))
+                        break;
+                    TURF_TRACE(ConcurrentMap_Leapfrog, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
+                               uptr(m_cell));
+                }
+            }
+        }
+    };
+
+    Mutator insertOrFind(Key key) {
+        return Mutator(*this, key);
+    }
+
+    Mutator find(Key key) {
+        return Mutator(*this, key, false);
+    }
+
+    // Lookup without creating a temporary Mutator.
+    Value get(Key key) {
+        Hash hash = KeyTraits::hash(key);
+        TURF_TRACE(ConcurrentMap_Leapfrog, 15, "[get] called", uptr(this), uptr(hash));
+        for (;;) {
+            typename Details::Table* table = m_root.load(turf::Consume);
+            typename Details::Cell* cell = Details::find(hash, table);
+            if (!cell)
+                return Value(ValueTraits::NullValue);
+            Value value = cell->value.load(turf::Consume);
+            if (value != Value(ValueTraits::Redirect))
+                return value; // Found an existing value
+            // We've been redirected to a new table. Help with the migration.
+            TURF_TRACE(ConcurrentMap_Leapfrog, 16, "[get] was redirected", uptr(table), uptr(hash));
+            table->jobCoordinator.participate();
+            // Try again in the new table.
+        }
+    }
+
+    Value set(Key key, Value desired) {
+        Mutator iter(*this, key);
+        return iter.exchangeValue(desired);
+    }
+
+    Value exchange(Key key, Value desired) {
+        Mutator iter(*this, key);
+        return iter.exchangeValue(desired);
+    }
+
+    Value erase(Key key) {
+        Mutator iter(*this, key, false);
+        return iter.eraseValue();
+    }
+
+    // The easiest way to implement an Iterator is to prevent all Redirects.
+    // The currrent Iterator does that by forbidding concurrent inserts.
+    // To make it work with concurrent inserts, we'd need a way to block TableMigrations.
+    class Iterator {
+    private:
+        typename Details::Table* m_table;
+        ureg m_idx;
+        Key m_hash;
+        Value m_value;
+
+    public:
+        Iterator(ConcurrentMap_Leapfrog& map) {
+            // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
+            m_table = map.m_root.load(turf::Consume);
+            m_idx = -1;
+            next();
+        }
+
+        void next() {
+            TURF_ASSERT(m_table);
+            TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
+            while (++m_idx <= m_table->sizeMask) {
+                // Index still inside range of table.
+                typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2);
+                typename Details::Cell* cell = group->cells + (m_idx & 3);
+                m_hash = cell->hash.load(turf::Relaxed);
+                if (m_hash != KeyTraits::NullHash) {
+                    // Cell has been reserved.
+                    m_value = cell->value.load(turf::Relaxed);
+                    TURF_ASSERT(m_value != Value(ValueTraits::Redirect));
+                    if (m_value != Value(ValueTraits::NullValue))
+                        return; // Yield this cell.
+                }
+            }
+            // That's the end of the map.
+            m_hash = KeyTraits::NullHash;
+            m_value = Value(ValueTraits::NullValue);
+        }
+
+        bool isValid() const {
+            return m_value != Value(ValueTraits::NullValue);
+        }
+
+        Key getKey() const {
+            TURF_ASSERT(isValid());
+            // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
+            return KeyTraits::dehash(m_hash);
+        }
+
+        Value getValue() const {
+            TURF_ASSERT(isValid());
+            return m_value;
+        }
+    };
+};
+
+} // namespace junction
+
+#endif // JUNCTION_CONCURRENTMAP_LEAPFROG_H
diff --git a/junction/details/LeapFrog.cpp b/junction/details/LeapFrog.cpp
deleted file mode 100644 (file)
index 7cb9452..0000000
+++ /dev/null
@@ -1,57 +0,0 @@
-/*------------------------------------------------------------------------
-  Junction: Concurrent data structures in C++
-  Copyright (c) 2016 Jeff Preshing
-
-  Distributed under the Simplified BSD License.
-  Original location: https://github.com/preshing/junction
-
-  This software is distributed WITHOUT ANY WARRANTY; without even the
-  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
-  See the LICENSE file for more information.
-------------------------------------------------------------------------*/
-
-#include <junction/Core.h>
-#include <junction/details/LeapFrog.h>
-#include <turf/Heap.h>
-
-namespace junction {
-namespace details {
-
-TURF_TRACE_DEFINE_BEGIN(LeapFrog, 33) // autogenerated by TidySource.py
-TURF_TRACE_DEFINE("[find] called")
-TURF_TRACE_DEFINE("[find] found existing cell optimistically")
-TURF_TRACE_DEFINE("[find] found existing cell")
-TURF_TRACE_DEFINE("[insertOrFind] called")
-TURF_TRACE_DEFINE("[insertOrFind] reserved first cell")
-TURF_TRACE_DEFINE("[insertOrFind] race to reserve first cell")
-TURF_TRACE_DEFINE("[insertOrFind] found in first cell")
-TURF_TRACE_DEFINE("[insertOrFind] race to read hash")
-TURF_TRACE_DEFINE("[insertOrFind] found in probe chain")
-TURF_TRACE_DEFINE("[insertOrFind] reserved cell")
-TURF_TRACE_DEFINE("[insertOrFind] race to reserve cell")
-TURF_TRACE_DEFINE("[insertOrFind] found outside probe chain")
-TURF_TRACE_DEFINE("[insertOrFind] found late-arriving cell in same bucket")
-TURF_TRACE_DEFINE("[insertOrFind] set link on behalf of late-arriving cell")
-TURF_TRACE_DEFINE("[insertOrFind] overflow")
-TURF_TRACE_DEFINE("[beginTableMigrationToSize] called")
-TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists")
-TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)")
-TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size")
-TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected")
-TURF_TRACE_DEFINE("[migrateRange] race to insert key")
-TURF_TRACE_DEFINE("[migrateRange] race to insert value")
-TURF_TRACE_DEFINE("[migrateRange] race inserted Redirect")
-TURF_TRACE_DEFINE("[migrateRange] in-use cell already redirected")
-TURF_TRACE_DEFINE("[migrateRange] racing update was erase")
-TURF_TRACE_DEFINE("[migrateRange] race to update migrated value")
-TURF_TRACE_DEFINE("[TableMigration::run] already ended")
-TURF_TRACE_DEFINE("[TableMigration::run] detected end flag set")
-TURF_TRACE_DEFINE("[TableMigration::run] destination overflow")
-TURF_TRACE_DEFINE("[TableMigration::run] race to set m_overflowed")
-TURF_TRACE_DEFINE("[TableMigration::run] out of migration units")
-TURF_TRACE_DEFINE("[TableMigration::run] not the last worker")
-TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started")
-TURF_TRACE_DEFINE_END(LeapFrog, 33)
-
-} // namespace details
-} // namespace junction
diff --git a/junction/details/LeapFrog.h b/junction/details/LeapFrog.h
deleted file mode 100644 (file)
index bc184f9..0000000
+++ /dev/null
@@ -1,577 +0,0 @@
-/*------------------------------------------------------------------------
-  Junction: Concurrent data structures in C++
-  Copyright (c) 2016 Jeff Preshing
-
-  Distributed under the Simplified BSD License.
-  Original location: https://github.com/preshing/junction
-
-  This software is distributed WITHOUT ANY WARRANTY; without even the
-  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
-  See the LICENSE file for more information.
-------------------------------------------------------------------------*/
-
-#ifndef JUNCTION_DETAILS_LEAPFROG_H
-#define JUNCTION_DETAILS_LEAPFROG_H
-
-#include <junction/Core.h>
-#include <turf/Atomic.h>
-#include <turf/Mutex.h>
-#include <turf/ManualResetEvent.h>
-#include <turf/Util.h>
-#include <junction/MapTraits.h>
-#include <turf/Trace.h>
-#include <turf/Heap.h>
-#include <junction/SimpleJobCoordinator.h>
-#include <junction/QSBR.h>
-
-// Enable this to force migration overflows (for test purposes):
-#define JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS 0
-
-namespace junction {
-namespace details {
-
-TURF_TRACE_DECLARE(LeapFrog, 33)
-
-template <class Map>
-struct LeapFrog {
-    typedef typename Map::Hash Hash;
-    typedef typename Map::Value Value;
-    typedef typename Map::KeyTraits KeyTraits;
-    typedef typename Map::ValueTraits ValueTraits;
-
-    static const ureg InitialSize = 8;
-    static const ureg TableMigrationUnitSize = 32;
-    static const ureg LinearSearchLimit = 128;
-    static const ureg CellsInUseSample = LinearSearchLimit;
-    TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256);              // Must fit in CellGroup::links
-    TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
-
-    struct Cell {
-        turf::Atomic<Hash> hash;
-        turf::Atomic<Value> value;
-    };
-
-    struct CellGroup {
-        // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain.
-        // Each cell in the probe chain is located within the table itself.
-        // "deltas" determines the index of the next cell in the probe chain.
-        // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket.
-        // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket.
-        // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket.
-        turf::Atomic<u8> deltas[8];
-        Cell cells[4];
-    };
-
-    struct Table {
-        const ureg sizeMask;                 // a power of two minus one
-        turf::Mutex mutex;                   // to DCLI the TableMigration (stored in the jobCoordinator)
-        SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
-
-        Table(ureg sizeMask) : sizeMask(sizeMask) {
-        }
-
-        static Table* create(ureg tableSize) {
-            TURF_ASSERT(turf::util::isPowerOf2(tableSize));
-            TURF_ASSERT(tableSize >= 4);
-            ureg numGroups = tableSize >> 2;
-            Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups);
-            new (table) Table(tableSize - 1);
-            for (ureg i = 0; i < numGroups; i++) {
-                CellGroup* group = table->getCellGroups() + i;
-                for (ureg j = 0; j < 4; j++) {
-                    group->deltas[j].storeNonatomic(0);
-                    group->deltas[j + 4].storeNonatomic(0);
-                    group->cells[j].hash.storeNonatomic(KeyTraits::NullHash);
-                    group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue));
-                }
-            }
-            return table;
-        }
-
-        void destroy() {
-            this->Table::~Table();
-            TURF_HEAP.free(this);
-        }
-
-        CellGroup* getCellGroups() const {
-            return (CellGroup*) (this + 1);
-        }
-
-        ureg getNumMigrationUnits() const {
-            return sizeMask / TableMigrationUnitSize + 1;
-        }
-    };
-
-    class TableMigration : public SimpleJobCoordinator::Job {
-    public:
-        struct Source {
-            Table* table;
-            turf::Atomic<ureg> sourceIndex;
-        };
-
-        Map& m_map;
-        Table* m_destination;
-        turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
-        turf::Atomic<bool> m_overflowed;
-        turf::Atomic<sreg> m_unitsRemaining;
-        ureg m_numSources;
-
-        TableMigration(Map& map) : m_map(map) {
-        }
-
-        static TableMigration* create(Map& map, ureg numSources) {
-            TableMigration* migration =
-                (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
-            new (migration) TableMigration(map);
-            migration->m_workerStatus.storeNonatomic(0);
-            migration->m_overflowed.storeNonatomic(false);
-            migration->m_unitsRemaining.storeNonatomic(0);
-            migration->m_numSources = numSources;
-            // Caller is responsible for filling in sources & destination
-            return migration;
-        }
-
-        virtual ~TableMigration() TURF_OVERRIDE {
-        }
-
-        void destroy() {
-            // Destroy all source tables.
-            for (ureg i = 0; i < m_numSources; i++)
-                if (getSources()[i].table)
-                    getSources()[i].table->destroy();
-            // Delete the migration object itself.
-            this->TableMigration::~TableMigration();
-            TURF_HEAP.free(this);
-        }
-
-        Source* getSources() const {
-            return (Source*) (this + 1);
-        }
-
-        bool migrateRange(Table* srcTable, ureg startIdx);
-        virtual void run() TURF_OVERRIDE;
-    };
-
-    static Cell* find(Hash hash, Table* table) {
-        TURF_TRACE(LeapFrog, 0, "[find] called", uptr(table), hash);
-        TURF_ASSERT(table);
-        TURF_ASSERT(hash != KeyTraits::NullHash);
-        ureg sizeMask = table->sizeMask;
-        // Optimistically check hashed cell even though it might belong to another bucket
-        ureg idx = hash & sizeMask;
-        CellGroup* group = table->getCellGroups() + (idx >> 2);
-        Cell* cell = group->cells + (idx & 3);
-        Hash probeHash = cell->hash.load(turf::Relaxed);
-        if (probeHash == hash) {
-            TURF_TRACE(LeapFrog, 1, "[find] found existing cell optimistically", uptr(table), idx);
-            return cell;
-        } else if (probeHash == KeyTraits::NullHash) {
-            return cell = NULL;
-        }
-        // Follow probe chain for our bucket
-        u8 delta = group->deltas[idx & 3].load(turf::Relaxed);
-        while (delta) {
-            idx = (idx + delta) & sizeMask;
-            group = table->getCellGroups() + (idx >> 2);
-            cell = group->cells + (idx & 3);
-            Hash probeHash = cell->hash.load(turf::Relaxed);
-            // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert,
-            // but we don't check for it. We just follow the probe chain.
-            if (probeHash == hash) {
-                TURF_TRACE(LeapFrog, 2, "[find] found existing cell", uptr(table), idx);
-                return cell;
-            }
-            delta = group->deltas[(idx & 3) + 4].load(turf::Relaxed);
-        }
-        // End of probe chain, not found
-        return NULL;
-    }
-
-    // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
-    enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
-    static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) {
-        TURF_TRACE(LeapFrog, 3, "[insertOrFind] called", uptr(table), hash);
-        TURF_ASSERT(table);
-        TURF_ASSERT(hash != KeyTraits::NullHash);
-        ureg sizeMask = table->sizeMask;
-        ureg idx = ureg(hash);
-
-        // Check hashed cell first, though it may not even belong to the bucket.
-        CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
-        cell = group->cells + (idx & 3);
-        Hash probeHash = cell->hash.load(turf::Relaxed);
-        if (probeHash == KeyTraits::NullHash) {
-            if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
-                TURF_TRACE(LeapFrog, 4, "[insertOrFind] reserved first cell", uptr(table), idx);
-                // There are no links to set. We're done.
-                return InsertResult_InsertedNew;
-            } else {
-                TURF_TRACE(LeapFrog, 5, "[insertOrFind] race to reserve first cell", uptr(table), idx);
-                // Fall through to check if it was the same hash...
-            }
-        }
-        if (probeHash == hash) {
-            TURF_TRACE(LeapFrog, 6, "[insertOrFind] found in first cell", uptr(table), idx);
-            return InsertResult_AlreadyFound;
-        }
-
-        // Follow the link chain for this bucket.
-        ureg maxIdx = idx + sizeMask;
-        ureg linkLevel = 0;
-        turf::Atomic<u8>* prevLink;
-        for (;;) {
-        followLink:
-            prevLink = group->deltas + ((idx & 3) + linkLevel);
-            linkLevel = 4;
-            u8 probeDelta = prevLink->load(turf::Relaxed);
-            if (probeDelta) {
-                idx += probeDelta;
-                // Check the hash for this cell.
-                group = table->getCellGroups() + ((idx & sizeMask) >> 2);
-                cell = group->cells + (idx & 3);
-                probeHash = cell->hash.load(turf::Relaxed);
-                if (probeHash == KeyTraits::NullHash) {
-                    // Cell was linked, but hash is not visible yet.
-                    // We could avoid this case (and guarantee it's visible) using acquire & release, but instead,
-                    // just poll until it becomes visible.
-                    TURF_TRACE(LeapFrog, 7, "[insertOrFind] race to read hash", uptr(table), idx);
-                    do {
-                        probeHash = cell->hash.load(turf::Acquire);
-                    } while (probeHash == KeyTraits::NullHash);
-                }
-                TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
-                if (probeHash == hash) {
-                    TURF_TRACE(LeapFrog, 8, "[insertOrFind] found in probe chain", uptr(table), idx);
-                    return InsertResult_AlreadyFound;
-                }
-            } else {
-                // Reached the end of the link chain for this bucket.
-                // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket.
-                ureg prevLinkIdx = idx;
-                TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
-                ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit);
-                while (linearProbesRemaining-- > 0) {
-                    idx++;
-                    group = table->getCellGroups() + ((idx & sizeMask) >> 2);
-                    cell = group->cells + (idx & 3);
-                    probeHash = cell->hash.load(turf::Relaxed);
-                    if (probeHash == KeyTraits::NullHash) {
-                        // It's an empty cell. Try to reserve it.
-                        if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
-                            // Success. We've reserved the cell. Link it to previous cell in same bucket.
-                            TURF_TRACE(LeapFrog, 9, "[insertOrFind] reserved cell", uptr(table), idx);
-                            TURF_ASSERT(probeDelta == 0);
-                            u8 desiredDelta = idx - prevLinkIdx;
-#if TURF_WITH_ASSERTS
-                            probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
-                            TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
-#else
-                            prevLink->store(desiredDelta, turf::Relaxed);
-#endif
-                            return InsertResult_InsertedNew;
-                        } else {
-                            TURF_TRACE(LeapFrog, 10, "[insertOrFind] race to reserve cell", uptr(table), idx);
-                            // Fall through to check if it's the same hash...
-                        }
-                    }
-                    Hash x = (probeHash ^ hash);
-                    // Check for same hash.
-                    if (!x) {
-                        TURF_TRACE(LeapFrog, 11, "[insertOrFind] found outside probe chain", uptr(table), idx);
-                        return InsertResult_AlreadyFound;
-                    }
-                    // Check for same bucket.
-                    if ((x & sizeMask) == 0) {
-                        TURF_TRACE(LeapFrog, 12, "[insertOrFind] found late-arriving cell in same bucket", uptr(table), idx);
-                        // Attempt to set the link on behalf of the late-arriving cell.
-                        // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here,
-                        // there's no guarantee that our own link chain will be well-formed by the time this function returns.
-                        // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.)
-                        u8 desiredDelta = idx - prevLinkIdx;
-#if TURF_WITH_ASSERTS
-                        probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
-                        TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
-                        if (probeDelta == 0)
-                            TURF_TRACE(LeapFrog, 13, "[insertOrFind] set link on behalf of late-arriving cell", uptr(table), idx);
-#else
-                        prevLink->store(desiredDelta, turf::Relaxed);
-#endif
-                        goto followLink; // Try to follow link chain for the bucket again.
-                    }
-                    // Continue linear search...
-                }
-                // Table is too full to insert.
-                overflowIdx = idx + 1;
-                TURF_TRACE(LeapFrog, 14, "[insertOrFind] overflow", uptr(table), overflowIdx);
-                return InsertResult_Overflow;
-            }
-        }
-    }
-
-    static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
-        // Create new migration by DCLI.
-        TURF_TRACE(LeapFrog, 15, "[beginTableMigrationToSize] called", 0, 0);
-        SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
-        if (job) {
-            TURF_TRACE(LeapFrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0);
-        } else {
-            turf::LockGuard<turf::Mutex> guard(table->mutex);
-            job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
-            if (job) {
-                TURF_TRACE(LeapFrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
-            } else {
-                // Create new migration.
-                TableMigration* migration = TableMigration::create(map, 1);
-                migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
-                migration->getSources()[0].table = table;
-                migration->getSources()[0].sourceIndex.storeNonatomic(0);
-                migration->m_destination = Table::create(nextTableSize);
-                // Publish the new migration.
-                table->jobCoordinator.storeRelease(migration);
-            }
-        }
-    }
-
-    static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) {
-        // Estimate number of cells in use based on a small sample.
-        ureg sizeMask = table->sizeMask;
-        ureg idx = overflowIdx - CellsInUseSample;
-        ureg inUseCells = 0;
-        for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
-            CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
-            Cell* cell = group->cells + (idx & 3);
-            Value value = cell->value.load(turf::Relaxed);
-            if (value == Value(ValueTraits::Redirect)) {
-                // Another thread kicked off the jobCoordinator. The caller will participate upon return.
-                TURF_TRACE(LeapFrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0);
-                return;
-            }
-            if (value != Value(ValueTraits::NullValue))
-                inUseCells++;
-            idx++;
-        }
-        float inUseRatio = float(inUseCells) / CellsInUseSample;
-        float estimatedInUse = (sizeMask + 1) * inUseRatio;
-#if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS
-        // Periodically underestimate the number of cells in use.
-        // This exercises the code that handles overflow during migration.
-        static ureg counter = 1;
-        if ((++counter & 3) == 0) {
-            estimatedInUse /= 4;
-        }
-#endif
-        ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
-        beginTableMigrationToSize(map, table, nextTableSize);
-    }
-}; // LeapFrog
-
-template <class Map>
-bool LeapFrog<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
-    ureg srcSizeMask = srcTable->sizeMask;
-    ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
-    // Iterate over source range.
-    for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
-        CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2);
-        Cell* srcCell = srcGroup->cells + (srcIdx & 3);
-        Hash srcHash;
-        Value srcValue;
-        // Fetch the srcHash and srcValue.
-        for (;;) {
-            srcHash = srcCell->hash.load(turf::Relaxed);
-            if (srcHash == KeyTraits::NullHash) {
-                // An unused cell. Try to put a Redirect marker in its value.
-                srcValue =
-                    srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
-                if (srcValue == Value(ValueTraits::Redirect)) {
-                    // srcValue is already marked Redirect due to previous incomplete migration.
-                    TURF_TRACE(LeapFrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
-                    break;
-                }
-                if (srcValue == Value(ValueTraits::NullValue))
-                    break; // Redirect has been placed. Break inner loop, continue outer loop.
-                TURF_TRACE(LeapFrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
-                // Otherwise, somebody just claimed the cell. Read srcHash again...
-            } else {
-                // Check for deleted/uninitialized value.
-                srcValue = srcCell->value.load(turf::Relaxed);
-                if (srcValue == Value(ValueTraits::NullValue)) {
-                    // Try to put a Redirect marker.
-                    if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
-                        break; // Redirect has been placed. Break inner loop, continue outer loop.
-                    TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
-                    if (srcValue == Value(ValueTraits::Redirect)) {
-                        // FIXME: I don't think this will happen. Investigate & change to assert
-                        TURF_TRACE(LeapFrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
-                        break;
-                    }
-                } else if (srcValue == Value(ValueTraits::Redirect)) {
-                    // srcValue is already marked Redirect due to previous incomplete migration.
-                    TURF_TRACE(LeapFrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
-                    break;
-                }
-
-                // We've got a key/value pair to migrate.
-                // Reserve a destination cell in the destination.
-                TURF_ASSERT(srcHash != KeyTraits::NullHash);
-                TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
-                TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
-                Cell* dstCell;
-                ureg overflowIdx;
-                InsertResult result = insertOrFind(srcHash, m_destination, dstCell, overflowIdx);
-                // During migration, a hash can only exist in one place among all the source tables,
-                // and it is only migrated by one thread. Therefore, the hash will never already exist
-                // in the destination table:
-                TURF_ASSERT(result != InsertResult_AlreadyFound);
-                if (result == InsertResult_Overflow) {
-                    // Destination overflow.
-                    // This can happen for several reasons. For example, the source table could have
-                    // existed of all deleted cells when it overflowed, resulting in a small destination
-                    // table size, but then another thread could re-insert all the same hashes
-                    // before the migration completed.
-                    // Caller will cancel the current migration and begin a new one.
-                    return false;
-                }
-                // Migrate the old value to the new cell.
-                for (;;) {
-                    // Copy srcValue to the destination.
-                    dstCell->value.store(srcValue, turf::Relaxed);
-                    // Try to place a Redirect marker in srcValue.
-                    Value doubleCheckedSrcValue =
-                        srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
-                    TURF_ASSERT(doubleCheckedSrcValue !=
-                                Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
-                    if (doubleCheckedSrcValue == srcValue) {
-                        // No racing writes to the src. We've successfully placed the Redirect marker.
-                        // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
-                        // by a late-arriving erase.
-                        if (srcValue == Value(ValueTraits::NullValue))
-                            TURF_TRACE(LeapFrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
-                        break;
-                    }
-                    // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
-                    TURF_TRACE(LeapFrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
-                    srcValue = doubleCheckedSrcValue;
-                }
-                // Cell successfully migrated. Proceed to next source cell.
-                break;
-            }
-        }
-    }
-    // Range has been migrated successfully.
-    return true;
-}
-
-template <class Map>
-void LeapFrog<Map>::TableMigration::run() {
-    // Conditionally increment the shared # of workers.
-    ureg probeStatus = m_workerStatus.load(turf::Relaxed);
-    do {
-        if (probeStatus & 1) {
-            // End flag is already set, so do nothing.
-            TURF_TRACE(LeapFrog, 26, "[TableMigration::run] already ended", uptr(this), 0);
-            return;
-        }
-    } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
-    // # of workers has been incremented, and the end flag is clear.
-    TURF_ASSERT((probeStatus & 1) == 0);
-
-    // Iterate over all source tables.
-    for (ureg s = 0; s < m_numSources; s++) {
-        Source& source = getSources()[s];
-        // Loop over all migration units in this source table.
-        for (;;) {
-            if (m_workerStatus.load(turf::Relaxed) & 1) {
-                TURF_TRACE(LeapFrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0);
-                goto endMigration;
-            }
-            ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
-            if (startIdx >= source.table->sizeMask + 1)
-                break; // No more migration units in this table. Try next source table.
-            bool overflowed = !migrateRange(source.table, startIdx);
-            if (overflowed) {
-                // *** FAILED MIGRATION ***
-                // TableMigration failed due to destination table overflow.
-                // No other thread can declare the migration successful at this point, because *this* unit will never complete,
-                // hence m_unitsRemaining won't reach zero.
-                // However, multiple threads can independently detect a failed migration at the same time.
-                TURF_TRACE(LeapFrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
-                // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
-                // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
-                // the thread
-                // that deals with it.
-                bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
-                if (oldOverflowed)
-                    TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
-                               uptr(oldOverflowed));
-                m_workerStatus.fetchOr(1, turf::Relaxed);
-                goto endMigration;
-            }
-            sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
-            TURF_ASSERT(prevRemaining > 0);
-            if (prevRemaining == 1) {
-                // *** SUCCESSFUL MIGRATION ***
-                // That was the last chunk to migrate.
-                m_workerStatus.fetchOr(1, turf::Relaxed);
-                goto endMigration;
-            }
-        }
-    }
-    TURF_TRACE(LeapFrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0);
-
-endMigration:
-    // Decrement the shared # of workers.
-    probeStatus = m_workerStatus.fetchSub(
-        2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
-    if (probeStatus >= 4) {
-        // There are other workers remaining. Return here so that only the very last worker will proceed.
-        TURF_TRACE(LeapFrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
-        return;
-    }
-
-    // We're the very last worker thread.
-    // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
-    TURF_ASSERT(probeStatus == 3);
-    bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
-    if (!overflowed) {
-        // The migration succeeded. This is the most likely outcome. Publish the new subtree.
-        m_map.publishTableMigration(this);
-        // End the jobCoodinator.
-        getSources()[0].table->jobCoordinator.end();
-    } else {
-        // The migration failed due to the overflow of the destination table.
-        Table* origTable = getSources()[0].table;
-        turf::LockGuard<turf::Mutex> guard(origTable->mutex);
-        SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
-        if (checkedJob != this) {
-            TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
-                       uptr(checkedJob));
-        } else {
-            TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
-            // Double the destination table size.
-            migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
-            // Transfer source tables to the new migration.
-            for (ureg i = 0; i < m_numSources; i++) {
-                migration->getSources()[i].table = getSources()[i].table;
-                getSources()[i].table = NULL;
-                migration->getSources()[i].sourceIndex.storeNonatomic(0);
-            }
-            migration->getSources()[m_numSources].table = m_destination;
-            migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
-            // Calculate total number of migration units to move.
-            ureg unitsRemaining = 0;
-            for (ureg s = 0; s < migration->m_numSources; s++)
-                unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
-            migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
-            // Publish the new migration.
-            origTable->jobCoordinator.storeRelease(migration);
-        }
-    }
-
-    // We're done with this TableMigration. Queue it for GC.
-    DefaultQSBR.enqueue(&TableMigration::destroy, this);
-}
-
-} // namespace details
-} // namespace junction
-
-#endif // JUNCTION_DETAILS_LEAPFROG_H
diff --git a/junction/details/Leapfrog.cpp b/junction/details/Leapfrog.cpp
new file mode 100644 (file)
index 0000000..367cd3d
--- /dev/null
@@ -0,0 +1,57 @@
+/*------------------------------------------------------------------------
+  Junction: Concurrent data structures in C++
+  Copyright (c) 2016 Jeff Preshing
+
+  Distributed under the Simplified BSD License.
+  Original location: https://github.com/preshing/junction
+
+  This software is distributed WITHOUT ANY WARRANTY; without even the
+  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+  See the LICENSE file for more information.
+------------------------------------------------------------------------*/
+
+#include <junction/Core.h>
+#include <junction/details/Leapfrog.h>
+#include <turf/Heap.h>
+
+namespace junction {
+namespace details {
+
+TURF_TRACE_DEFINE_BEGIN(Leapfrog, 33) // autogenerated by TidySource.py
+TURF_TRACE_DEFINE("[find] called")
+TURF_TRACE_DEFINE("[find] found existing cell optimistically")
+TURF_TRACE_DEFINE("[find] found existing cell")
+TURF_TRACE_DEFINE("[insertOrFind] called")
+TURF_TRACE_DEFINE("[insertOrFind] reserved first cell")
+TURF_TRACE_DEFINE("[insertOrFind] race to reserve first cell")
+TURF_TRACE_DEFINE("[insertOrFind] found in first cell")
+TURF_TRACE_DEFINE("[insertOrFind] race to read hash")
+TURF_TRACE_DEFINE("[insertOrFind] found in probe chain")
+TURF_TRACE_DEFINE("[insertOrFind] reserved cell")
+TURF_TRACE_DEFINE("[insertOrFind] race to reserve cell")
+TURF_TRACE_DEFINE("[insertOrFind] found outside probe chain")
+TURF_TRACE_DEFINE("[insertOrFind] found late-arriving cell in same bucket")
+TURF_TRACE_DEFINE("[insertOrFind] set link on behalf of late-arriving cell")
+TURF_TRACE_DEFINE("[insertOrFind] overflow")
+TURF_TRACE_DEFINE("[beginTableMigrationToSize] called")
+TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists")
+TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)")
+TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size")
+TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected")
+TURF_TRACE_DEFINE("[migrateRange] race to insert key")
+TURF_TRACE_DEFINE("[migrateRange] race to insert value")
+TURF_TRACE_DEFINE("[migrateRange] race inserted Redirect")
+TURF_TRACE_DEFINE("[migrateRange] in-use cell already redirected")
+TURF_TRACE_DEFINE("[migrateRange] racing update was erase")
+TURF_TRACE_DEFINE("[migrateRange] race to update migrated value")
+TURF_TRACE_DEFINE("[TableMigration::run] already ended")
+TURF_TRACE_DEFINE("[TableMigration::run] detected end flag set")
+TURF_TRACE_DEFINE("[TableMigration::run] destination overflow")
+TURF_TRACE_DEFINE("[TableMigration::run] race to set m_overflowed")
+TURF_TRACE_DEFINE("[TableMigration::run] out of migration units")
+TURF_TRACE_DEFINE("[TableMigration::run] not the last worker")
+TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started")
+TURF_TRACE_DEFINE_END(Leapfrog, 33)
+
+} // namespace details
+} // namespace junction
diff --git a/junction/details/Leapfrog.h b/junction/details/Leapfrog.h
new file mode 100644 (file)
index 0000000..33fd5be
--- /dev/null
@@ -0,0 +1,577 @@
+/*------------------------------------------------------------------------
+  Junction: Concurrent data structures in C++
+  Copyright (c) 2016 Jeff Preshing
+
+  Distributed under the Simplified BSD License.
+  Original location: https://github.com/preshing/junction
+
+  This software is distributed WITHOUT ANY WARRANTY; without even the
+  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+  See the LICENSE file for more information.
+------------------------------------------------------------------------*/
+
+#ifndef JUNCTION_DETAILS_LEAPFROG_H
+#define JUNCTION_DETAILS_LEAPFROG_H
+
+#include <junction/Core.h>
+#include <turf/Atomic.h>
+#include <turf/Mutex.h>
+#include <turf/ManualResetEvent.h>
+#include <turf/Util.h>
+#include <junction/MapTraits.h>
+#include <turf/Trace.h>
+#include <turf/Heap.h>
+#include <junction/SimpleJobCoordinator.h>
+#include <junction/QSBR.h>
+
+// Enable this to force migration overflows (for test purposes):
+#define JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS 0
+
+namespace junction {
+namespace details {
+
+TURF_TRACE_DECLARE(Leapfrog, 33)
+
+template <class Map>
+struct Leapfrog {
+    typedef typename Map::Hash Hash;
+    typedef typename Map::Value Value;
+    typedef typename Map::KeyTraits KeyTraits;
+    typedef typename Map::ValueTraits ValueTraits;
+
+    static const ureg InitialSize = 8;
+    static const ureg TableMigrationUnitSize = 32;
+    static const ureg LinearSearchLimit = 128;
+    static const ureg CellsInUseSample = LinearSearchLimit;
+    TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256);              // Must fit in CellGroup::links
+    TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
+
+    struct Cell {
+        turf::Atomic<Hash> hash;
+        turf::Atomic<Value> value;
+    };
+
+    struct CellGroup {
+        // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain.
+        // Each cell in the probe chain is located within the table itself.
+        // "deltas" determines the index of the next cell in the probe chain.
+        // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket.
+        // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket.
+        // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket.
+        turf::Atomic<u8> deltas[8];
+        Cell cells[4];
+    };
+
+    struct Table {
+        const ureg sizeMask;                 // a power of two minus one
+        turf::Mutex mutex;                   // to DCLI the TableMigration (stored in the jobCoordinator)
+        SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
+
+        Table(ureg sizeMask) : sizeMask(sizeMask) {
+        }
+
+        static Table* create(ureg tableSize) {
+            TURF_ASSERT(turf::util::isPowerOf2(tableSize));
+            TURF_ASSERT(tableSize >= 4);
+            ureg numGroups = tableSize >> 2;
+            Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups);
+            new (table) Table(tableSize - 1);
+            for (ureg i = 0; i < numGroups; i++) {
+                CellGroup* group = table->getCellGroups() + i;
+                for (ureg j = 0; j < 4; j++) {
+                    group->deltas[j].storeNonatomic(0);
+                    group->deltas[j + 4].storeNonatomic(0);
+                    group->cells[j].hash.storeNonatomic(KeyTraits::NullHash);
+                    group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue));
+                }
+            }
+            return table;
+        }
+
+        void destroy() {
+            this->Table::~Table();
+            TURF_HEAP.free(this);
+        }
+
+        CellGroup* getCellGroups() const {
+            return (CellGroup*) (this + 1);
+        }
+
+        ureg getNumMigrationUnits() const {
+            return sizeMask / TableMigrationUnitSize + 1;
+        }
+    };
+
+    class TableMigration : public SimpleJobCoordinator::Job {
+    public:
+        struct Source {
+            Table* table;
+            turf::Atomic<ureg> sourceIndex;
+        };
+
+        Map& m_map;
+        Table* m_destination;
+        turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
+        turf::Atomic<bool> m_overflowed;
+        turf::Atomic<sreg> m_unitsRemaining;
+        ureg m_numSources;
+
+        TableMigration(Map& map) : m_map(map) {
+        }
+
+        static TableMigration* create(Map& map, ureg numSources) {
+            TableMigration* migration =
+                (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
+            new (migration) TableMigration(map);
+            migration->m_workerStatus.storeNonatomic(0);
+            migration->m_overflowed.storeNonatomic(false);
+            migration->m_unitsRemaining.storeNonatomic(0);
+            migration->m_numSources = numSources;
+            // Caller is responsible for filling in sources & destination
+            return migration;
+        }
+
+        virtual ~TableMigration() TURF_OVERRIDE {
+        }
+
+        void destroy() {
+            // Destroy all source tables.
+            for (ureg i = 0; i < m_numSources; i++)
+                if (getSources()[i].table)
+                    getSources()[i].table->destroy();
+            // Delete the migration object itself.
+            this->TableMigration::~TableMigration();
+            TURF_HEAP.free(this);
+        }
+
+        Source* getSources() const {
+            return (Source*) (this + 1);
+        }
+
+        bool migrateRange(Table* srcTable, ureg startIdx);
+        virtual void run() TURF_OVERRIDE;
+    };
+
+    static Cell* find(Hash hash, Table* table) {
+        TURF_TRACE(Leapfrog, 0, "[find] called", uptr(table), hash);
+        TURF_ASSERT(table);
+        TURF_ASSERT(hash != KeyTraits::NullHash);
+        ureg sizeMask = table->sizeMask;
+        // Optimistically check hashed cell even though it might belong to another bucket
+        ureg idx = hash & sizeMask;
+        CellGroup* group = table->getCellGroups() + (idx >> 2);
+        Cell* cell = group->cells + (idx & 3);
+        Hash probeHash = cell->hash.load(turf::Relaxed);
+        if (probeHash == hash) {
+            TURF_TRACE(Leapfrog, 1, "[find] found existing cell optimistically", uptr(table), idx);
+            return cell;
+        } else if (probeHash == KeyTraits::NullHash) {
+            return cell = NULL;
+        }
+        // Follow probe chain for our bucket
+        u8 delta = group->deltas[idx & 3].load(turf::Relaxed);
+        while (delta) {
+            idx = (idx + delta) & sizeMask;
+            group = table->getCellGroups() + (idx >> 2);
+            cell = group->cells + (idx & 3);
+            Hash probeHash = cell->hash.load(turf::Relaxed);
+            // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert,
+            // but we don't check for it. We just follow the probe chain.
+            if (probeHash == hash) {
+                TURF_TRACE(Leapfrog, 2, "[find] found existing cell", uptr(table), idx);
+                return cell;
+            }
+            delta = group->deltas[(idx & 3) + 4].load(turf::Relaxed);
+        }
+        // End of probe chain, not found
+        return NULL;
+    }
+
+    // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
+    enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
+    static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) {
+        TURF_TRACE(Leapfrog, 3, "[insertOrFind] called", uptr(table), hash);
+        TURF_ASSERT(table);
+        TURF_ASSERT(hash != KeyTraits::NullHash);
+        ureg sizeMask = table->sizeMask;
+        ureg idx = ureg(hash);
+
+        // Check hashed cell first, though it may not even belong to the bucket.
+        CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
+        cell = group->cells + (idx & 3);
+        Hash probeHash = cell->hash.load(turf::Relaxed);
+        if (probeHash == KeyTraits::NullHash) {
+            if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
+                TURF_TRACE(Leapfrog, 4, "[insertOrFind] reserved first cell", uptr(table), idx);
+                // There are no links to set. We're done.
+                return InsertResult_InsertedNew;
+            } else {
+                TURF_TRACE(Leapfrog, 5, "[insertOrFind] race to reserve first cell", uptr(table), idx);
+                // Fall through to check if it was the same hash...
+            }
+        }
+        if (probeHash == hash) {
+            TURF_TRACE(Leapfrog, 6, "[insertOrFind] found in first cell", uptr(table), idx);
+            return InsertResult_AlreadyFound;
+        }
+
+        // Follow the link chain for this bucket.
+        ureg maxIdx = idx + sizeMask;
+        ureg linkLevel = 0;
+        turf::Atomic<u8>* prevLink;
+        for (;;) {
+        followLink:
+            prevLink = group->deltas + ((idx & 3) + linkLevel);
+            linkLevel = 4;
+            u8 probeDelta = prevLink->load(turf::Relaxed);
+            if (probeDelta) {
+                idx += probeDelta;
+                // Check the hash for this cell.
+                group = table->getCellGroups() + ((idx & sizeMask) >> 2);
+                cell = group->cells + (idx & 3);
+                probeHash = cell->hash.load(turf::Relaxed);
+                if (probeHash == KeyTraits::NullHash) {
+                    // Cell was linked, but hash is not visible yet.
+                    // We could avoid this case (and guarantee it's visible) using acquire & release, but instead,
+                    // just poll until it becomes visible.
+                    TURF_TRACE(Leapfrog, 7, "[insertOrFind] race to read hash", uptr(table), idx);
+                    do {
+                        probeHash = cell->hash.load(turf::Acquire);
+                    } while (probeHash == KeyTraits::NullHash);
+                }
+                TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
+                if (probeHash == hash) {
+                    TURF_TRACE(Leapfrog, 8, "[insertOrFind] found in probe chain", uptr(table), idx);
+                    return InsertResult_AlreadyFound;
+                }
+            } else {
+                // Reached the end of the link chain for this bucket.
+                // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket.
+                ureg prevLinkIdx = idx;
+                TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
+                ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit);
+                while (linearProbesRemaining-- > 0) {
+                    idx++;
+                    group = table->getCellGroups() + ((idx & sizeMask) >> 2);
+                    cell = group->cells + (idx & 3);
+                    probeHash = cell->hash.load(turf::Relaxed);
+                    if (probeHash == KeyTraits::NullHash) {
+                        // It's an empty cell. Try to reserve it.
+                        if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
+                            // Success. We've reserved the cell. Link it to previous cell in same bucket.
+                            TURF_TRACE(Leapfrog, 9, "[insertOrFind] reserved cell", uptr(table), idx);
+                            TURF_ASSERT(probeDelta == 0);
+                            u8 desiredDelta = idx - prevLinkIdx;
+#if TURF_WITH_ASSERTS
+                            probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
+                            TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
+#else
+                            prevLink->store(desiredDelta, turf::Relaxed);
+#endif
+                            return InsertResult_InsertedNew;
+                        } else {
+                            TURF_TRACE(Leapfrog, 10, "[insertOrFind] race to reserve cell", uptr(table), idx);
+                            // Fall through to check if it's the same hash...
+                        }
+                    }
+                    Hash x = (probeHash ^ hash);
+                    // Check for same hash.
+                    if (!x) {
+                        TURF_TRACE(Leapfrog, 11, "[insertOrFind] found outside probe chain", uptr(table), idx);
+                        return InsertResult_AlreadyFound;
+                    }
+                    // Check for same bucket.
+                    if ((x & sizeMask) == 0) {
+                        TURF_TRACE(Leapfrog, 12, "[insertOrFind] found late-arriving cell in same bucket", uptr(table), idx);
+                        // Attempt to set the link on behalf of the late-arriving cell.
+                        // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here,
+                        // there's no guarantee that our own link chain will be well-formed by the time this function returns.
+                        // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.)
+                        u8 desiredDelta = idx - prevLinkIdx;
+#if TURF_WITH_ASSERTS
+                        probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
+                        TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
+                        if (probeDelta == 0)
+                            TURF_TRACE(Leapfrog, 13, "[insertOrFind] set link on behalf of late-arriving cell", uptr(table), idx);
+#else
+                        prevLink->store(desiredDelta, turf::Relaxed);
+#endif
+                        goto followLink; // Try to follow link chain for the bucket again.
+                    }
+                    // Continue linear search...
+                }
+                // Table is too full to insert.
+                overflowIdx = idx + 1;
+                TURF_TRACE(Leapfrog, 14, "[insertOrFind] overflow", uptr(table), overflowIdx);
+                return InsertResult_Overflow;
+            }
+        }
+    }
+
+    static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
+        // Create new migration by DCLI.
+        TURF_TRACE(Leapfrog, 15, "[beginTableMigrationToSize] called", 0, 0);
+        SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
+        if (job) {
+            TURF_TRACE(Leapfrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0);
+        } else {
+            turf::LockGuard<turf::Mutex> guard(table->mutex);
+            job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
+            if (job) {
+                TURF_TRACE(Leapfrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
+            } else {
+                // Create new migration.
+                TableMigration* migration = TableMigration::create(map, 1);
+                migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
+                migration->getSources()[0].table = table;
+                migration->getSources()[0].sourceIndex.storeNonatomic(0);
+                migration->m_destination = Table::create(nextTableSize);
+                // Publish the new migration.
+                table->jobCoordinator.storeRelease(migration);
+            }
+        }
+    }
+
+    static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) {
+        // Estimate number of cells in use based on a small sample.
+        ureg sizeMask = table->sizeMask;
+        ureg idx = overflowIdx - CellsInUseSample;
+        ureg inUseCells = 0;
+        for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
+            CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
+            Cell* cell = group->cells + (idx & 3);
+            Value value = cell->value.load(turf::Relaxed);
+            if (value == Value(ValueTraits::Redirect)) {
+                // Another thread kicked off the jobCoordinator. The caller will participate upon return.
+                TURF_TRACE(Leapfrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0);
+                return;
+            }
+            if (value != Value(ValueTraits::NullValue))
+                inUseCells++;
+            idx++;
+        }
+        float inUseRatio = float(inUseCells) / CellsInUseSample;
+        float estimatedInUse = (sizeMask + 1) * inUseRatio;
+#if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS
+        // Periodically underestimate the number of cells in use.
+        // This exercises the code that handles overflow during migration.
+        static ureg counter = 1;
+        if ((++counter & 3) == 0) {
+            estimatedInUse /= 4;
+        }
+#endif
+        ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
+        beginTableMigrationToSize(map, table, nextTableSize);
+    }
+}; // Leapfrog
+
+template <class Map>
+bool Leapfrog<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
+    ureg srcSizeMask = srcTable->sizeMask;
+    ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
+    // Iterate over source range.
+    for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
+        CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2);
+        Cell* srcCell = srcGroup->cells + (srcIdx & 3);
+        Hash srcHash;
+        Value srcValue;
+        // Fetch the srcHash and srcValue.
+        for (;;) {
+            srcHash = srcCell->hash.load(turf::Relaxed);
+            if (srcHash == KeyTraits::NullHash) {
+                // An unused cell. Try to put a Redirect marker in its value.
+                srcValue =
+                    srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
+                if (srcValue == Value(ValueTraits::Redirect)) {
+                    // srcValue is already marked Redirect due to previous incomplete migration.
+                    TURF_TRACE(Leapfrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
+                    break;
+                }
+                if (srcValue == Value(ValueTraits::NullValue))
+                    break; // Redirect has been placed. Break inner loop, continue outer loop.
+                TURF_TRACE(Leapfrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
+                // Otherwise, somebody just claimed the cell. Read srcHash again...
+            } else {
+                // Check for deleted/uninitialized value.
+                srcValue = srcCell->value.load(turf::Relaxed);
+                if (srcValue == Value(ValueTraits::NullValue)) {
+                    // Try to put a Redirect marker.
+                    if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
+                        break; // Redirect has been placed. Break inner loop, continue outer loop.
+                    TURF_TRACE(Leapfrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
+                    if (srcValue == Value(ValueTraits::Redirect)) {
+                        // FIXME: I don't think this will happen. Investigate & change to assert
+                        TURF_TRACE(Leapfrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
+                        break;
+                    }
+                } else if (srcValue == Value(ValueTraits::Redirect)) {
+                    // srcValue is already marked Redirect due to previous incomplete migration.
+                    TURF_TRACE(Leapfrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
+                    break;
+                }
+
+                // We've got a key/value pair to migrate.
+                // Reserve a destination cell in the destination.
+                TURF_ASSERT(srcHash != KeyTraits::NullHash);
+                TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
+                TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
+                Cell* dstCell;
+                ureg overflowIdx;
+                InsertResult result = insertOrFind(srcHash, m_destination, dstCell, overflowIdx);
+                // During migration, a hash can only exist in one place among all the source tables,
+                // and it is only migrated by one thread. Therefore, the hash will never already exist
+                // in the destination table:
+                TURF_ASSERT(result != InsertResult_AlreadyFound);
+                if (result == InsertResult_Overflow) {
+                    // Destination overflow.
+                    // This can happen for several reasons. For example, the source table could have
+                    // existed of all deleted cells when it overflowed, resulting in a small destination
+                    // table size, but then another thread could re-insert all the same hashes
+                    // before the migration completed.
+                    // Caller will cancel the current migration and begin a new one.
+                    return false;
+                }
+                // Migrate the old value to the new cell.
+                for (;;) {
+                    // Copy srcValue to the destination.
+                    dstCell->value.store(srcValue, turf::Relaxed);
+                    // Try to place a Redirect marker in srcValue.
+                    Value doubleCheckedSrcValue =
+                        srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
+                    TURF_ASSERT(doubleCheckedSrcValue !=
+                                Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
+                    if (doubleCheckedSrcValue == srcValue) {
+                        // No racing writes to the src. We've successfully placed the Redirect marker.
+                        // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
+                        // by a late-arriving erase.
+                        if (srcValue == Value(ValueTraits::NullValue))
+                            TURF_TRACE(Leapfrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
+                        break;
+                    }
+                    // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
+                    TURF_TRACE(Leapfrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
+                    srcValue = doubleCheckedSrcValue;
+                }
+                // Cell successfully migrated. Proceed to next source cell.
+                break;
+            }
+        }
+    }
+    // Range has been migrated successfully.
+    return true;
+}
+
+template <class Map>
+void Leapfrog<Map>::TableMigration::run() {
+    // Conditionally increment the shared # of workers.
+    ureg probeStatus = m_workerStatus.load(turf::Relaxed);
+    do {
+        if (probeStatus & 1) {
+            // End flag is already set, so do nothing.
+            TURF_TRACE(Leapfrog, 26, "[TableMigration::run] already ended", uptr(this), 0);
+            return;
+        }
+    } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
+    // # of workers has been incremented, and the end flag is clear.
+    TURF_ASSERT((probeStatus & 1) == 0);
+
+    // Iterate over all source tables.
+    for (ureg s = 0; s < m_numSources; s++) {
+        Source& source = getSources()[s];
+        // Loop over all migration units in this source table.
+        for (;;) {
+            if (m_workerStatus.load(turf::Relaxed) & 1) {
+                TURF_TRACE(Leapfrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0);
+                goto endMigration;
+            }
+            ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
+            if (startIdx >= source.table->sizeMask + 1)
+                break; // No more migration units in this table. Try next source table.
+            bool overflowed = !migrateRange(source.table, startIdx);
+            if (overflowed) {
+                // *** FAILED MIGRATION ***
+                // TableMigration failed due to destination table overflow.
+                // No other thread can declare the migration successful at this point, because *this* unit will never complete,
+                // hence m_unitsRemaining won't reach zero.
+                // However, multiple threads can independently detect a failed migration at the same time.
+                TURF_TRACE(Leapfrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
+                // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
+                // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
+                // the thread
+                // that deals with it.
+                bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
+                if (oldOverflowed)
+                    TURF_TRACE(Leapfrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
+                               uptr(oldOverflowed));
+                m_workerStatus.fetchOr(1, turf::Relaxed);
+                goto endMigration;
+            }
+            sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
+            TURF_ASSERT(prevRemaining > 0);
+            if (prevRemaining == 1) {
+                // *** SUCCESSFUL MIGRATION ***
+                // That was the last chunk to migrate.
+                m_workerStatus.fetchOr(1, turf::Relaxed);
+                goto endMigration;
+            }
+        }
+    }
+    TURF_TRACE(Leapfrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0);
+
+endMigration:
+    // Decrement the shared # of workers.
+    probeStatus = m_workerStatus.fetchSub(
+        2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
+    if (probeStatus >= 4) {
+        // There are other workers remaining. Return here so that only the very last worker will proceed.
+        TURF_TRACE(Leapfrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
+        return;
+    }
+
+    // We're the very last worker thread.
+    // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
+    TURF_ASSERT(probeStatus == 3);
+    bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
+    if (!overflowed) {
+        // The migration succeeded. This is the most likely outcome. Publish the new subtree.
+        m_map.publishTableMigration(this);
+        // End the jobCoodinator.
+        getSources()[0].table->jobCoordinator.end();
+    } else {
+        // The migration failed due to the overflow of the destination table.
+        Table* origTable = getSources()[0].table;
+        turf::LockGuard<turf::Mutex> guard(origTable->mutex);
+        SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
+        if (checkedJob != this) {
+            TURF_TRACE(Leapfrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+                       uptr(checkedJob));
+        } else {
+            TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
+            // Double the destination table size.
+            migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
+            // Transfer source tables to the new migration.
+            for (ureg i = 0; i < m_numSources; i++) {
+                migration->getSources()[i].table = getSources()[i].table;
+                getSources()[i].table = NULL;
+                migration->getSources()[i].sourceIndex.storeNonatomic(0);
+            }
+            migration->getSources()[m_numSources].table = m_destination;
+            migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
+            // Calculate total number of migration units to move.
+            ureg unitsRemaining = 0;
+            for (ureg s = 0; s < migration->m_numSources; s++)
+                unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
+            migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
+            // Publish the new migration.
+            origTable->jobCoordinator.storeRelease(migration);
+        }
+    }
+
+    // We're done with this TableMigration. Queue it for GC.
+    DefaultQSBR.enqueue(&TableMigration::destroy, this);
+}
+
+} // namespace details
+} // namespace junction
+
+#endif // JUNCTION_DETAILS_LEAPFROG_H
diff --git a/junction/extra/impl/MapAdapter_LeapFrog.h b/junction/extra/impl/MapAdapter_LeapFrog.h
deleted file mode 100644 (file)
index c277044..0000000
+++ /dev/null
@@ -1,62 +0,0 @@
-/*------------------------------------------------------------------------
-  Junction: Concurrent data structures in C++
-  Copyright (c) 2016 Jeff Preshing
-
-  Distributed under the Simplified BSD License.
-  Original location: https://github.com/preshing/junction
-
-  This software is distributed WITHOUT ANY WARRANTY; without even the
-  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
-  See the LICENSE file for more information.
-------------------------------------------------------------------------*/
-
-#ifndef JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
-#define JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
-
-#include <junction/Core.h>
-#include <junction/QSBR.h>
-#include <junction/ConcurrentMap_LeapFrog.h>
-#include <turf/Util.h>
-
-namespace junction {
-namespace extra {
-
-class MapAdapter {
-public:
-    static TURF_CONSTEXPR const char* MapName = "Junction LeapFrog map";
-
-    MapAdapter(ureg) {
-    }
-
-    class ThreadContext {
-    private:
-        QSBR::Context m_qsbrContext;
-
-    public:
-        ThreadContext(MapAdapter&, ureg) {
-        }
-
-        void registerThread() {
-            m_qsbrContext = DefaultQSBR.createContext();
-        }
-
-        void unregisterThread() {
-            DefaultQSBR.destroyContext(m_qsbrContext);
-        }
-
-        void update() {
-            DefaultQSBR.update(m_qsbrContext);
-        }
-    };
-
-    typedef ConcurrentMap_LeapFrog<u32, void*> Map;
-
-    static ureg getInitialCapacity(ureg maxPopulation) {
-        return turf::util::roundUpPowerOf2(maxPopulation / 4);
-    }
-};
-
-} // namespace extra
-} // namespace junction
-
-#endif // JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
diff --git a/junction/extra/impl/MapAdapter_Leapfrog.h b/junction/extra/impl/MapAdapter_Leapfrog.h
new file mode 100644 (file)
index 0000000..7661725
--- /dev/null
@@ -0,0 +1,62 @@
+/*------------------------------------------------------------------------
+  Junction: Concurrent data structures in C++
+  Copyright (c) 2016 Jeff Preshing
+
+  Distributed under the Simplified BSD License.
+  Original location: https://github.com/preshing/junction
+
+  This software is distributed WITHOUT ANY WARRANTY; without even the
+  implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
+  See the LICENSE file for more information.
+------------------------------------------------------------------------*/
+
+#ifndef JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
+#define JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
+
+#include <junction/Core.h>
+#include <junction/QSBR.h>
+#include <junction/ConcurrentMap_Leapfrog.h>
+#include <turf/Util.h>
+
+namespace junction {
+namespace extra {
+
+class MapAdapter {
+public:
+    static TURF_CONSTEXPR const char* MapName = "Junction Leapfrog map";
+
+    MapAdapter(ureg) {
+    }
+
+    class ThreadContext {
+    private:
+        QSBR::Context m_qsbrContext;
+
+    public:
+        ThreadContext(MapAdapter&, ureg) {
+        }
+
+        void registerThread() {
+            m_qsbrContext = DefaultQSBR.createContext();
+        }
+
+        void unregisterThread() {
+            DefaultQSBR.destroyContext(m_qsbrContext);
+        }
+
+        void update() {
+            DefaultQSBR.update(m_qsbrContext);
+        }
+    };
+
+    typedef ConcurrentMap_Leapfrog<u32, void*> Map;
+
+    static ureg getInitialCapacity(ureg maxPopulation) {
+        return turf::util::roundUpPowerOf2(maxPopulation / 4);
+    }
+};
+
+} // namespace extra
+} // namespace junction
+
+#endif // JUNCTION_EXTRA_IMPL_MAPADAPTER_LEAPFROG_H
index 1dfc5fa56114b509ca1a519efd5ed88d7dceffb1..f84ef12fe3a83ee6770e92d4c0a05d158468a0a5 100644 (file)
@@ -10,7 +10,7 @@ CMAKE = os.getenv('CMAKE', 'cmake')
 ALL_MAPS = [
     ('michael', 'junction/extra/impl/MapAdapter_CDS_Michael.h', ['-DJUNCTION_WITH_CDS=1', '-DTURF_WITH_EXCEPTIONS=1']),
     ('linear', 'junction/extra/impl/MapAdapter_Linear.h', []),
-    ('leapfrog', 'junction/extra/impl/MapAdapter_LeapFrog.h', []),
+    ('leapfrog', 'junction/extra/impl/MapAdapter_Leapfrog.h', []),
     ('grampa', 'junction/extra/impl/MapAdapter_Grampa.h', []),
     ('stdmap', 'junction/extra/impl/MapAdapter_StdMap.h', []),
     ('folly', 'junction/extra/impl/MapAdapter_Folly.h', ['-DJUNCTION_WITH_FOLLY=1', '-DTURF_WITH_EXCEPTIONS=1']),
index 1e769845fc6535a06a1b321ebd67d5d540dea4b4..792e196bbb4b1003b59f967a4ded2b02a149e22d 100644 (file)
@@ -11,7 +11,7 @@ ALL_MAPS = [
     ('null', 'junction/extra/impl/MapAdapter_Null.h', [], ['-i256', '-c10']),
     ('michael', 'junction/extra/impl/MapAdapter_CDS_Michael.h', ['-DJUNCTION_WITH_CDS=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i256', '-c10']),
     ('linear', 'junction/extra/impl/MapAdapter_Linear.h', [], ['-i256', '-c10']),
-    ('leapfrog', 'junction/extra/impl/MapAdapter_LeapFrog.h', [], ['-i256', '-c10']),
+    ('leapfrog', 'junction/extra/impl/MapAdapter_Leapfrog.h', [], ['-i256', '-c10']),
     ('grampa', 'junction/extra/impl/MapAdapter_Grampa.h', [], ['-i256', '-c10']),
     ('stdmap', 'junction/extra/impl/MapAdapter_StdMap.h', [], ['-i256', '-c10']),
     ('folly', 'junction/extra/impl/MapAdapter_Folly.h', ['-DJUNCTION_WITH_FOLLY=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i256', '-c1']),
index bc9cbd5aef2cbd4abb9e4a3d905212cb8d3d4cb6..83359b68314d1ffda8dff0ceaedcea2045aeb8c0 100644 (file)
@@ -10,7 +10,7 @@ CMAKE = os.getenv('CMAKE', 'cmake')
 ALL_MAPS = [
     ('michael', 'junction/extra/impl/MapAdapter_CDS_Michael.h', ['-DJUNCTION_WITH_CDS=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i10000', '-c200']),
     ('linear', 'junction/extra/impl/MapAdapter_Linear.h', [], ['-i10000', '-c200']),
-    ('leapfrog', 'junction/extra/impl/MapAdapter_LeapFrog.h', [], ['-i10000', '-c200']),
+    ('leapfrog', 'junction/extra/impl/MapAdapter_Leapfrog.h', [], ['-i10000', '-c200']),
     ('grampa', 'junction/extra/impl/MapAdapter_Grampa.h', [], ['-i10000', '-c200']),
     ('stdmap', 'junction/extra/impl/MapAdapter_StdMap.h', [], ['-i10000', '-c10']),
     ('folly', 'junction/extra/impl/MapAdapter_Folly.h', ['-DJUNCTION_WITH_FOLLY=1', '-DTURF_WITH_EXCEPTIONS=1'], ['-i2000', '-c1']),