Improve ConcurrentMap_Linear scalability
authorJeff Preshing <filter-github@preshing.com>
Thu, 11 Feb 2016 17:27:24 +0000 (12:27 -0500)
committerJeff Preshing <filter-github@preshing.com>
Thu, 11 Feb 2016 17:27:24 +0000 (12:27 -0500)
Removed shared variable Table::valuesRemaining. Instead, detect and
handle overflows during migration, like ConcurrentMap_LeapFrog.

Added #defines to force migration overflows during testing:
JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS

6 cores runs ~57% faster on Win32.

junction/ConcurrentMap_LeapFrog.h
junction/ConcurrentMap_Linear.cpp
junction/ConcurrentMap_Linear.h
junction/details/Grampa.h
junction/details/LeapFrog.h
junction/details/Linear.cpp
junction/details/Linear.h

index a9dc4a33369f61b15187999daac0802b417e4276..389ce4088e145e2f5b631582083002ee6ce2b539 100644 (file)
@@ -251,7 +251,7 @@ public:
     // Lookup without creating a temporary Mutator.
     Value get(Key key) {
         Hash hash = KeyTraits::hash(key);
-        TURF_TRACE(ConcurrentMap_LeapFrog, 15, "[get] called", uptr(hash), 0);
+        TURF_TRACE(ConcurrentMap_LeapFrog, 15, "[get] called", uptr(this), uptr(hash));
         for (;;) {
             typename Details::Table* table = m_root.load(turf::Consume);
             typename Details::Cell* cell = Details::find(hash, table);
index 41fbb1e4c57e031c4df229d7630c036bd3d76a09..90819e2c9fa102da1d53e1ae3b4ee3e0e2d70610 100644 (file)
 
 namespace junction {
 
-TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_Linear, 18) // autogenerated by TidySource.py
+TURF_TRACE_DEFINE_BEGIN(ConcurrentMap_Linear, 17) // autogenerated by TidySource.py
 TURF_TRACE_DEFINE("[Mutator] find constructor called")
 TURF_TRACE_DEFINE("[Mutator] find was redirected")
 TURF_TRACE_DEFINE("[Mutator] insert constructor called")
 TURF_TRACE_DEFINE("[Mutator] insert was redirected")
 TURF_TRACE_DEFINE("[Mutator::exchangeValue] called")
-TURF_TRACE_DEFINE("[Mutator::exchangeValue] ran out of valuesRemaining")
 TURF_TRACE_DEFINE("[Mutator::exchangeValue] exchanged Value")
 TURF_TRACE_DEFINE("[Mutator::exchangeValue] detected race to write value")
 TURF_TRACE_DEFINE("[Mutator::exchangeValue] racing write inserted new value")
@@ -33,6 +32,6 @@ TURF_TRACE_DEFINE("[Mutator::eraseValue] was redirected")
 TURF_TRACE_DEFINE("[Mutator::eraseValue] was re-redirected")
 TURF_TRACE_DEFINE("[get] called")
 TURF_TRACE_DEFINE("[get] was redirected")
-TURF_TRACE_DEFINE_END(ConcurrentMap_Linear, 18)
+TURF_TRACE_DEFINE_END(ConcurrentMap_Linear, 17)
 
 } // namespace junction
index 4cf349d062c1c64eda17a35bf45031206271b4b3..bf6c1eaa305b9c3cab88808409c6224375ba0ce0 100644 (file)
@@ -21,7 +21,7 @@
 
 namespace junction {
 
-TURF_TRACE_DECLARE(ConcurrentMap_Linear, 18)
+TURF_TRACE_DECLARE(ConcurrentMap_Linear, 17)
 
 template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V> >
 class ConcurrentMap_Linear {
@@ -37,9 +37,7 @@ private:
     turf::Atomic<typename Details::Table*> m_root;
 
 public:
-    ConcurrentMap_Linear(ureg capacity = Details::InitialSize) {
-        ureg limitNumValues = capacity * 3 / 4;
-        m_root.storeNonatomic(Details::Table::create(capacity, limitNumValues));
+    ConcurrentMap_Linear(ureg capacity = Details::InitialSize) : m_root(Details::Table::create(capacity)) {
     }
 
     ~ConcurrentMap_Linear() {
@@ -53,7 +51,7 @@ public:
         // There are no racing calls to this function.
         typename Details::Table* oldRoot = m_root.loadNonatomic();
         m_root.store(migration->m_destination, turf::Release);
-        TURF_ASSERT(oldRoot == migration->m_source);
+        TURF_ASSERT(oldRoot == migration->getSources()[0].table);
         // Caller will GC the TableMigration and the source table.
     }
 
@@ -87,7 +85,7 @@ public:
                 if (m_value != Value(ValueTraits::Redirect))
                     return; // Found an existing value
                 // We've encountered a Redirect value. Help finish the migration.
-                TURF_TRACE(ConcurrentMap_Linear, 1, "[Mutator] find was redirected", uptr(m_table), uptr(0));
+                TURF_TRACE(ConcurrentMap_Linear, 1, "[Mutator] find was redirected", uptr(m_table), 0);
                 m_table->jobCoordinator.participate();
                 // Try again using the latest root.
             }
@@ -138,52 +136,27 @@ public:
             TURF_TRACE(ConcurrentMap_Linear, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
             for (;;) {
                 Value oldValue = m_value;
-                s32 prevRemainingValues = s32(-1);
-                if (oldValue == Value(ValueTraits::NullValue)) {
-                    // It's a deleted or newly initialized cell.
-                    // Decrement remainingValues to ensure we have permission to (re)insert a value.
-                    prevRemainingValues = m_table->valuesRemaining.fetchSub(1, turf::Relaxed);
-                    if (prevRemainingValues <= 0) {
-                        TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] ran out of valuesRemaining", uptr(m_table),
-                                   prevRemainingValues);
-                        // Can't (re)insert any more values.
-                        // There are two ways this can happen. One with a TableMigration already in progress, and one without:
-                        // 1. A TableMigration puts a cap on the number of values late-arriving threads are allowed to insert.
-                        // 2. Two threads race to insert the same key, and it's the last free cell in the table.
-                        // (Note: We could get tid of the valuesRemaining counter by handling the possibility of migration
-                        // failure,
-                        // as LeapFrog and Grampa do...)
-                        m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
-                        // Since we don't know whether there's already a TableMigration in progress, always attempt to start one
-                        // here:
-                        Details::beginTableMigration(m_map, m_table);
-                        goto helpMigrate;
-                    }
-                }
                 if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
                     // Exchange was successful. Return previous value.
-                    TURF_TRACE(ConcurrentMap_Linear, 6, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired));
+                    TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired));
                     Value result = m_value;
                     m_value = desired; // Leave the mutator in a valid state
                     return result;
                 }
                 // The CAS failed and m_value has been updated with the latest value.
                 if (m_value != Value(ValueTraits::Redirect)) {
-                    TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
+                    TURF_TRACE(ConcurrentMap_Linear, 6, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
                                uptr(m_value));
                     if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
-                        TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] racing write inserted new value",
+                        TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] racing write inserted new value",
                                    uptr(m_table), uptr(m_value));
-                        m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
                     }
                     // There was a racing write (or erase) to this cell.
                     // Pretend we exchanged with ourselves, and just let the racing write win.
                     return desired;
                 }
                 // We've encountered a Redirect value. Help finish the migration.
-                TURF_TRACE(ConcurrentMap_Linear, 9, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
-            helpMigrate:
-                // Help migrate to new table.
+                TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
                 Hash hash = m_cell->hash.load(turf::Relaxed);
                 for (;;) {
                     // Help complete the migration.
@@ -195,7 +168,7 @@ public:
                     case Details::InsertResult_AlreadyFound:
                         m_value = m_cell->value.load(turf::Consume);
                         if (m_value == Value(ValueTraits::Redirect)) {
-                            TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
+                            TURF_TRACE(ConcurrentMap_Linear, 9, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
                                        uptr(m_value));
                             break;
                         }
@@ -203,8 +176,7 @@ public:
                     case Details::InsertResult_InsertedNew:
                         goto breakOuter;
                     case Details::InsertResult_Overflow:
-                        TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
-                                   0);
+                        TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), 0);
                         Details::beginTableMigration(m_map, m_table);
                         break;
                     }
@@ -221,7 +193,7 @@ public:
 
         Value eraseValue() {
             TURF_ASSERT(m_cell); // Cell must have been found or inserted
-            TURF_TRACE(ConcurrentMap_Linear, 12, "[Mutator::eraseValue] called", uptr(m_table), m_cell - m_table->getCells());
+            TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::eraseValue] called", uptr(m_table), m_cell - m_table->getCells());
             for (;;) {
                 if (m_value == Value(ValueTraits::NullValue))
                     return Value(m_value);
@@ -229,13 +201,12 @@ public:
                 if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
                     // Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
                     TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
-                    m_table->valuesRemaining.fetchAdd(1, turf::Relaxed);
                     Value result = m_value;
                     m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
                     return result;
                 }
                 // The CAS failed and m_value has been updated with the latest value.
-                TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
+                TURF_TRACE(ConcurrentMap_Linear, 12, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
                            m_cell - m_table->getCells());
                 if (m_value != Value(ValueTraits::Redirect)) {
                     // There was a racing write (or erase) to this cell.
@@ -243,7 +214,7 @@ public:
                     return Value(ValueTraits::NullValue);
                 }
                 // We've been redirected to a new table.
-                TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was redirected", uptr(m_table),
+                TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] was redirected", uptr(m_table),
                            m_cell - m_table->getCells());
                 Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
                 for (;;) {
@@ -259,7 +230,7 @@ public:
                     m_value = m_cell->value.load(turf::Relaxed);
                     if (m_value != Value(ValueTraits::Redirect))
                         break;
-                    TURF_TRACE(ConcurrentMap_Linear, 15, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
+                    TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
                                m_cell - m_table->getCells());
                 }
             }
@@ -277,7 +248,7 @@ public:
     // Lookup without creating a temporary Mutator.
     Value get(Key key) {
         Hash hash = KeyTraits::hash(key);
-        TURF_TRACE(ConcurrentMap_Linear, 16, "[get] called", uptr(this), uptr(hash));
+        TURF_TRACE(ConcurrentMap_Linear, 15, "[get] called", uptr(this), uptr(hash));
         for (;;) {
             typename Details::Table* table = m_root.load(turf::Consume);
             typename Details::Cell* cell = Details::find(hash, table);
@@ -287,7 +258,7 @@ public:
             if (value != Value(ValueTraits::Redirect))
                 return value; // Found an existing value
             // We've been redirected to a new table. Help with the migration.
-            TURF_TRACE(ConcurrentMap_Linear, 17, "[get] was redirected", uptr(table), uptr(cell));
+            TURF_TRACE(ConcurrentMap_Linear, 16, "[get] was redirected", uptr(table), uptr(cell));
             table->jobCoordinator.participate();
             // Try again in the new table.
         }
index 9f85df249fba655a458c13ab21708e5a93cf4879..2fe2cc226f992a3ad73bdd93cbae5188d33917f1 100644 (file)
@@ -696,10 +696,8 @@ void Grampa<Map>::TableMigration::run() {
                 // However, multiple threads can independently detect a failed migration at the same time.
                 TURF_TRACE(Grampa, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
                 // The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads
-                // before
-                // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
-                // the thread
-                // that deals with it.
+                // before we can safely deal with the overflow. Therefore, the thread that detects the failure is often
+                // different from the thread that deals with it.
                 // Store overflowTableIndex unconditionally; racing writes should be rare, and it doesn't matter which one wins.
                 sreg oldIndex = m_overflowTableIndex.exchange(overflowTableIndex, turf::Relaxed);
                 if (oldIndex >= 0)
index 800d7374e2dcc1470c95e90c03ed090d54f2a7b7..75d91fbeb34e5fd7c2e8a4ed4773d5960320e127 100644 (file)
@@ -24,6 +24,9 @@
 #include <junction/SimpleJobCoordinator.h>
 #include <junction/QSBR.h>
 
+// Enable this to force migration overflows (for test purposes):
+#define JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS 0
+
 namespace junction {
 namespace details {
 
@@ -349,7 +352,15 @@ struct LeapFrog {
         }
         float inUseRatio = float(inUseCells) / CellsInUseSample;
         float estimatedInUse = (sizeMask + 1) * inUseRatio;
-        ureg nextTableSize = turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2));
+#if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS
+        // Periodically underestimate the number of cells in use.
+        // This exercises the code that handles overflow during migration.
+        static ureg counter = 1;
+        if ((++counter & 3) == 0) {
+            estimatedInUse /= 4;
+        }
+#endif
+        ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
         beginTableMigrationToSize(map, table, nextTableSize);
     }
 }; // LeapFrog
index d731567d15234a39427fcc1a6a722d36d084833c..299f6549cfc062c0123d9c93062240ba326da468 100644 (file)
@@ -17,7 +17,7 @@
 namespace junction {
 namespace details {
 
-TURF_TRACE_DEFINE_BEGIN(Linear, 22) // autogenerated by TidySource.py
+TURF_TRACE_DEFINE_BEGIN(Linear, 26) // autogenerated by TidySource.py
 TURF_TRACE_DEFINE("[find] called")
 TURF_TRACE_DEFINE("[find] found existing cell")
 TURF_TRACE_DEFINE("[insert] called")
@@ -26,21 +26,25 @@ TURF_TRACE_DEFINE("[insert] ran out of cellsRemaining")
 TURF_TRACE_DEFINE("[insert] reserved cell")
 TURF_TRACE_DEFINE("[insert] detected race to reserve cell")
 TURF_TRACE_DEFINE("[insert] race reserved same hash")
-TURF_TRACE_DEFINE("[beginTableMigration] called")
-TURF_TRACE_DEFINE("[beginTableMigration] new migration already exists")
-TURF_TRACE_DEFINE("[beginTableMigration] new migration already exists (double-checked)")
-TURF_TRACE_DEFINE("[table] restarting valuesRemaining CAS loop")
-TURF_TRACE_DEFINE("[table] valuesRemaining CAS failed")
+TURF_TRACE_DEFINE("[beginTableMigrationToSize] called")
+TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists")
+TURF_TRACE_DEFINE("[beginTableMigrationToSize] new migration already exists (double-checked)")
+TURF_TRACE_DEFINE("[beginTableMigration] redirected while determining table size")
 TURF_TRACE_DEFINE("[migrateRange] empty cell already redirected")
 TURF_TRACE_DEFINE("[migrateRange] race to insert key")
 TURF_TRACE_DEFINE("[migrateRange] race to insert value")
+TURF_TRACE_DEFINE("[migrateRange] race inserted Redirect")
+TURF_TRACE_DEFINE("[migrateRange] in-use cell already redirected")
 TURF_TRACE_DEFINE("[migrateRange] racing update was erase")
 TURF_TRACE_DEFINE("[migrateRange] race to update migrated value")
 TURF_TRACE_DEFINE("[TableMigration::run] already ended")
 TURF_TRACE_DEFINE("[TableMigration::run] detected end flag set")
+TURF_TRACE_DEFINE("[TableMigration::run] destination overflow")
+TURF_TRACE_DEFINE("[TableMigration::run] race to set m_overflowed")
 TURF_TRACE_DEFINE("[TableMigration::run] out of migration units")
 TURF_TRACE_DEFINE("[TableMigration::run] not the last worker")
-TURF_TRACE_DEFINE_END(Linear, 22)
+TURF_TRACE_DEFINE("[TableMigration::run] a new TableMigration was already started")
+TURF_TRACE_DEFINE_END(Linear, 26)
 
 } // namespace details
 } // namespace junction
index 991f853f708f6e1a528c91ca47e5848fe9d42821..40fc450980b62cdc8212768934a9f0b725e76abd 100644 (file)
 #include <junction/SimpleJobCoordinator.h>
 #include <junction/QSBR.h>
 
+// Enable this to force migration overflows (for test purposes):
+#define JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS 0
+
 namespace junction {
 namespace details {
 
-TURF_TRACE_DECLARE(Linear, 22)
+TURF_TRACE_DECLARE(Linear, 26)
 
 template <class Map>
 struct Linear {
@@ -38,10 +41,7 @@ struct Linear {
 
     static const ureg InitialSize = 8;
     static const ureg TableMigrationUnitSize = 32;
-    static const ureg LinearSearchLimit = 128;
-    static const ureg CellsInUseSample = LinearSearchLimit;
-    TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256);              // Must fit in CellGroup::links
-    TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
+    static const ureg CellsInUseSample = 256;
 
     struct Cell {
         turf::Atomic<Hash> hash;
@@ -50,21 +50,17 @@ struct Linear {
 
     struct Table {
         const ureg sizeMask; // a power of two minus one
-        const ureg limitNumValues;
         turf::Atomic<sreg> cellsRemaining;
-        turf::Atomic<sreg> valuesRemaining;
         turf::Mutex mutex;                   // to DCLI the TableMigration (stored in the jobCoordinator)
         SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
 
-        Table(ureg sizeMask, ureg limitNumValues)
-            : sizeMask(sizeMask), limitNumValues(limitNumValues), cellsRemaining(limitNumValues),
-              valuesRemaining(limitNumValues) {
+        Table(ureg sizeMask) : sizeMask(sizeMask), cellsRemaining(sreg(sizeMask * 0.75f)) {
         }
 
-        static Table* create(ureg tableSize, ureg limitNumValues) {
+        static Table* create(ureg tableSize) {
             TURF_ASSERT(turf::util::isPowerOf2(tableSize));
             Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
-            new (table) Table(tableSize - 1, limitNumValues);
+            new (table) Table(tableSize - 1);
             for (ureg j = 0; j < tableSize; j++) {
                 table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
                 table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
@@ -88,27 +84,51 @@ struct Linear {
 
     class TableMigration : public SimpleJobCoordinator::Job {
     public:
+        struct Source {
+            Table* table;
+            turf::Atomic<ureg> sourceIndex;
+        };
+
         Map& m_map;
-        Table* m_source;
-        turf::Atomic<ureg> m_sourceIndex;
         Table* m_destination;
         turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
+        turf::Atomic<bool> m_overflowed;
         turf::Atomic<sreg> m_unitsRemaining;
+        ureg m_numSources;
+
+        TableMigration(Map& map) : m_map(map) {
+        }
 
-        TableMigration(Map& map) : m_map(map), m_sourceIndex(0), m_workerStatus(0), m_unitsRemaining(0) {
-            // Caller is responsible for filling in source & destination
+        static TableMigration* create(Map& map, ureg numSources) {
+            TableMigration* migration =
+                (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
+            new (migration) TableMigration(map);
+            migration->m_workerStatus.storeNonatomic(0);
+            migration->m_overflowed.storeNonatomic(false);
+            migration->m_unitsRemaining.storeNonatomic(0);
+            migration->m_numSources = numSources;
+            // Caller is responsible for filling in sources & destination
+            return migration;
         }
 
         virtual ~TableMigration() TURF_OVERRIDE {
-            // Destroy source table.
-            m_source->destroy();
         }
 
         void destroy() {
-            delete this;
+            // Destroy all source tables.
+            for (ureg i = 0; i < m_numSources; i++)
+                if (getSources()[i].table)
+                    getSources()[i].table->destroy();
+            // Delete the migration object itself.
+            this->TableMigration::~TableMigration();
+            TURF_HEAP.free(this);
+        }
+
+        Source* getSources() const {
+            return (Source*) (this + 1);
         }
 
-        bool migrateRange(ureg startIdx);
+        bool migrateRange(Table* srcTable, ureg startIdx);
         virtual void run() TURF_OVERRIDE;
     };
 
@@ -150,7 +170,7 @@ struct Linear {
             }
             if (probeHash == KeyTraits::NullHash) {
                 // It's an empty cell. Try to reserve it.
-                // But first, decrement cellsRemaining to ensure we have permission to create new getCells().
+                // But first, decrement cellsRemaining to ensure we have permission to create new cells.
                 s32 prevCellsRemaining = table->cellsRemaining.fetchSub(1, turf::Relaxed);
                 if (prevCellsRemaining <= 0) {
                     // Table is overpopulated.
@@ -177,71 +197,69 @@ struct Linear {
         }
     }
 
-    static void beginTableMigration(Map& map, Table* table) {
+    static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
         // Create new migration by DCLI.
-        TURF_TRACE(Linear, 8, "[beginTableMigration] called", 0, 0);
+        TURF_TRACE(Linear, 8, "[beginTableMigrationToSize] called", 0, 0);
         SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
         if (job) {
-            TURF_TRACE(Linear, 9, "[beginTableMigration] new migration already exists", 0, 0);
+            TURF_TRACE(Linear, 9, "[beginTableMigrationToSize] new migration already exists", 0, 0);
         } else {
             turf::LockGuard<turf::Mutex> guard(table->mutex);
             job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
             if (job) {
-                TURF_TRACE(Linear, 10, "[beginTableMigration] new migration already exists (double-checked)", 0, 0);
+                TURF_TRACE(Linear, 10, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
             } else {
-                // Determine new migration size and cap the number of values that can be added concurrent to the migration.
-                sreg oldValuesLimit = table->limitNumValues;
-                sreg oldValuesRemaining = table->valuesRemaining.load(turf::Relaxed);
-                sreg oldValuesInUse = oldValuesLimit - oldValuesRemaining;
-            calculateNextTableSize:
-                sreg nextTableSize = turf::util::roundUpPowerOf2(oldValuesInUse * 2);
-                sreg nextLimitNumValues = nextTableSize * 3 / 4;
-                if (nextLimitNumValues < oldValuesLimit) {
-                    // Set the new limitNumValues on the *current* table.
-                    // This prevents other threads, while the migration is in progress, from concurrently
-                    // re-inserting more values than the new table can hold.
-                    // To set the new limitNumValues on the current table in an atomic fashion,
-                    // we update its valuesRemaining via CAS loop:
-                    for (;;) {
-                        // We must recalculate desiredValuesRemaining on each iteration of the CAS loop
-                        oldValuesInUse = oldValuesLimit - oldValuesRemaining;
-                        sreg desiredValuesRemaining = nextLimitNumValues - oldValuesInUse;
-                        if (desiredValuesRemaining < 0) {
-                            TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues,
-                                       desiredValuesRemaining);
-                            // Must recalculate nextTableSize. Goto, baby!
-                            goto calculateNextTableSize;
-                        }
-                        if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed,
-                                                                       turf::Relaxed))
-                            break; // Success!
-                        // CAS failed because table->valuesRemaining was modified by another thread.
-                        // An updated value has been reloaded into oldValuesRemaining (modified by reference).
-                        // Recalculate desiredValuesRemaining to account for the updated value, and try again.
-                        TURF_TRACE(Linear, 12, "[table] valuesRemaining CAS failed", oldValuesRemaining, desiredValuesRemaining);
-                    }
-                }
-                // Now we are assured that the new table will not become overpopulated during the migration process.
                 // Create new migration.
-                TableMigration* migration = new TableMigration(map);
-                migration->m_source = table;
-                migration->m_destination = Table::create(nextTableSize, nextLimitNumValues);
+                TableMigration* migration = TableMigration::create(map, 1);
                 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
+                migration->getSources()[0].table = table;
+                migration->getSources()[0].sourceIndex.storeNonatomic(0);
+                migration->m_destination = Table::create(nextTableSize);
                 // Publish the new migration.
                 table->jobCoordinator.storeRelease(migration);
             }
         }
     }
+
+    static void beginTableMigration(Map& map, Table* table) {
+        // Estimate number of cells in use based on a small sample.
+        ureg idx = 0;
+        ureg sampleSize = turf::util::min<ureg>(table->sizeMask + 1, CellsInUseSample);
+        ureg inUseCells = 0;
+        for (; idx < sampleSize; idx++) {
+            Cell* cell = table->getCells() + idx;
+            Value value = cell->value.load(turf::Relaxed);
+            if (value == Value(ValueTraits::Redirect)) {
+                // Another thread kicked off the jobCoordinator. The caller will participate upon return.
+                TURF_TRACE(Linear, 11, "[beginTableMigration] redirected while determining table size", 0, 0);
+                return;
+            }
+            if (value != Value(ValueTraits::NullValue))
+                inUseCells++;
+        }
+        float inUseRatio = float(inUseCells) / sampleSize;
+        float estimatedInUse = (table->sizeMask + 1) * inUseRatio;
+#if JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS
+        // Periodically underestimate the number of cells in use.
+        // This exercises the code that handles overflow during migration.
+        static ureg counter = 1;
+        if ((++counter & 3) == 0) {
+            estimatedInUse /= 4;
+        }
+#endif
+        ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
+        beginTableMigrationToSize(map, table, nextTableSize);
+    }
 }; // Linear
 
 template <class Map>
-bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
-    ureg srcSizeMask = m_source->sizeMask;
+bool Linear<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
+    ureg srcSizeMask = srcTable->sizeMask;
     ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
     sreg valuesMigrated = 0;
     // Iterate over source range.
     for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
-        Cell* srcCell = m_source->getCells() + (srcIdx & srcSizeMask);
+        Cell* srcCell = srcTable->getCells() + (srcIdx & srcSizeMask);
         Hash srcHash;
         Value srcValue;
         // Fetch the srcHash and srcValue.
@@ -253,12 +271,12 @@ bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
                     srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
                 if (srcValue == Value(ValueTraits::Redirect)) {
                     // srcValue is already marked Redirect due to previous incomplete migration.
-                    TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(m_source), srcIdx);
+                    TURF_TRACE(Linear, 12, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
                     break;
                 }
                 if (srcValue == Value(ValueTraits::NullValue))
                     break; // Redirect has been placed. Break inner loop, continue outer loop.
-                TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(m_source), srcIdx);
+                TURF_TRACE(Linear, 13, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
                 // Otherwise, somebody just claimed the cell. Read srcHash again...
             } else {
                 // Check for deleted/uninitialized value.
@@ -267,21 +285,38 @@ bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
                     // Try to put a Redirect marker.
                     if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
                         break; // Redirect has been placed. Break inner loop, continue outer loop.
-                    TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(m_source), srcIdx);
+                    TURF_TRACE(Linear, 14, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
+                    if (srcValue == Value(ValueTraits::Redirect)) {
+                        // FIXME: I don't think this will happen. Investigate & change to assert
+                        TURF_TRACE(Linear, 15, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
+                        break;
+                    }
+                } else if (srcValue == Value(ValueTraits::Redirect)) {
+                    // srcValue is already marked Redirect due to previous incomplete migration.
+                    TURF_TRACE(Linear, 16, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
+                    break;
                 }
 
                 // We've got a key/value pair to migrate.
                 // Reserve a destination cell in the destination.
                 TURF_ASSERT(srcHash != KeyTraits::NullHash);
                 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
-                TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible.
+                TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
                 Cell* dstCell;
                 InsertResult result = insert(srcHash, m_destination, dstCell);
                 // During migration, a hash can only exist in one place among all the source tables,
                 // and it is only migrated by one thread. Therefore, the hash will never already exist
                 // in the destination table:
                 TURF_ASSERT(result != InsertResult_AlreadyFound);
-                TURF_ASSERT(result != InsertResult_Overflow);
+                if (result == InsertResult_Overflow) {
+                    // Destination overflow.
+                    // This can happen for several reasons. For example, the source table could have
+                    // existed of all deleted cells when it overflowed, resulting in a small destination
+                    // table size, but then another thread could re-insert all the same hashes
+                    // before the migration completed.
+                    // Caller will cancel the current migration and begin a new one.
+                    return false;
+                }
                 // Migrate the old value to the new cell.
                 for (;;) {
                     // Copy srcValue to the destination.
@@ -296,13 +331,11 @@ bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
                         // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
                         // by a late-arriving erase.
                         if (srcValue == Value(ValueTraits::NullValue))
-                            TURF_TRACE(Linear, 16, "[migrateRange] racing update was erase", uptr(m_source), srcIdx);
-                        else
-                            valuesMigrated++;
+                            TURF_TRACE(Linear, 17, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
                         break;
                     }
                     // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
-                    TURF_TRACE(Linear, 17, "[migrateRange] race to update migrated value", uptr(m_source), srcIdx);
+                    TURF_TRACE(Linear, 18, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
                     srcValue = doubleCheckedSrcValue;
                 }
                 // Cell successfully migrated. Proceed to next source cell.
@@ -310,9 +343,6 @@ bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
             }
         }
     }
-    sreg prevValuesRemaining = m_destination->valuesRemaining.fetchSub(valuesMigrated, turf::Relaxed);
-    TURF_ASSERT(valuesMigrated <= prevValuesRemaining);
-    TURF_UNUSED(prevValuesRemaining);
     // Range has been migrated successfully.
     return true;
 }
@@ -324,32 +354,55 @@ void Linear<Map>::TableMigration::run() {
     do {
         if (probeStatus & 1) {
             // End flag is already set, so do nothing.
-            TURF_TRACE(Linear, 18, "[TableMigration::run] already ended", uptr(this), 0);
+            TURF_TRACE(Linear, 19, "[TableMigration::run] already ended", uptr(this), 0);
             return;
         }
     } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
     // # of workers has been incremented, and the end flag is clear.
     TURF_ASSERT((probeStatus & 1) == 0);
 
-    // Loop over all migration units in the source table.
-    for (;;) {
-        if (m_workerStatus.load(turf::Relaxed) & 1) {
-            TURF_TRACE(Linear, 19, "[TableMigration::run] detected end flag set", uptr(this), 0);
-            goto endMigration;
-        }
-        ureg startIdx = m_sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
-        if (startIdx >= m_source->sizeMask + 1)
-            break; // No more migration units.
-        migrateRange(startIdx);
-        sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
-        TURF_ASSERT(prevRemaining > 0);
-        if (prevRemaining == 1) {
-            // That was the last chunk to migrate.
-            m_workerStatus.fetchOr(1, turf::Relaxed);
-            goto endMigration;
+    // Iterate over all source tables.
+    for (ureg s = 0; s < m_numSources; s++) {
+        Source& source = getSources()[s];
+        // Loop over all migration units in this source table.
+        for (;;) {
+            if (m_workerStatus.load(turf::Relaxed) & 1) {
+                TURF_TRACE(Linear, 20, "[TableMigration::run] detected end flag set", uptr(this), 0);
+                goto endMigration;
+            }
+            ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
+            if (startIdx >= source.table->sizeMask + 1)
+                break; // No more migration units in this table. Try next source table.
+            bool overflowed = !migrateRange(source.table, startIdx);
+            if (overflowed) {
+                // *** FAILED MIGRATION ***
+                // TableMigration failed due to destination table overflow.
+                // No other thread can declare the migration successful at this point, because *this* unit will never complete,
+                // hence m_unitsRemaining won't reach zero.
+                // However, multiple threads can independently detect a failed migration at the same time.
+                TURF_TRACE(Linear, 21, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
+                // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
+                // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
+                // the thread
+                // that deals with it.
+                bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
+                if (oldOverflowed)
+                    TURF_TRACE(Linear, 22, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
+                               uptr(oldOverflowed));
+                m_workerStatus.fetchOr(1, turf::Relaxed);
+                goto endMigration;
+            }
+            sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
+            TURF_ASSERT(prevRemaining > 0);
+            if (prevRemaining == 1) {
+                // *** SUCCESSFUL MIGRATION ***
+                // That was the last chunk to migrate.
+                m_workerStatus.fetchOr(1, turf::Relaxed);
+                goto endMigration;
+            }
         }
     }
-    TURF_TRACE(Linear, 20, "[TableMigration::run] out of migration units", uptr(this), 0);
+    TURF_TRACE(Linear, 23, "[TableMigration::run] out of migration units", uptr(this), 0);
 
 endMigration:
     // Decrement the shared # of workers.
@@ -357,16 +410,48 @@ endMigration:
         2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
     if (probeStatus >= 4) {
         // There are other workers remaining. Return here so that only the very last worker will proceed.
-        TURF_TRACE(Linear, 21, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
+        TURF_TRACE(Linear, 24, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
         return;
     }
 
     // We're the very last worker thread.
-    // Publish the new subtree.
+    // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
     TURF_ASSERT(probeStatus == 3);
-    m_map.publishTableMigration(this);
-    // End the jobCoodinator.
-    m_source->jobCoordinator.end();
+    bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
+    if (!overflowed) {
+        // The migration succeeded. This is the most likely outcome. Publish the new subtree.
+        m_map.publishTableMigration(this);
+        // End the jobCoodinator.
+        getSources()[0].table->jobCoordinator.end();
+    } else {
+        // The migration failed due to the overflow of the destination table.
+        Table* origTable = getSources()[0].table;
+        turf::LockGuard<turf::Mutex> guard(origTable->mutex);
+        SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
+        if (checkedJob != this) {
+            TURF_TRACE(Linear, 25, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+                       uptr(checkedJob));
+        } else {
+            TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
+            // Double the destination table size.
+            migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
+            // Transfer source tables to the new migration.
+            for (ureg i = 0; i < m_numSources; i++) {
+                migration->getSources()[i].table = getSources()[i].table;
+                getSources()[i].table = NULL;
+                migration->getSources()[i].sourceIndex.storeNonatomic(0);
+            }
+            migration->getSources()[m_numSources].table = m_destination;
+            migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
+            // Calculate total number of migration units to move.
+            ureg unitsRemaining = 0;
+            for (ureg s = 0; s < migration->m_numSources; s++)
+                unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
+            migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
+            // Publish the new migration.
+            origTable->jobCoordinator.storeRelease(migration);
+        }
+    }
 
     // We're done with this TableMigration. Queue it for GC.
     DefaultQSBR.enqueue(&TableMigration::destroy, this);