Improve ConcurrentMap_Linear scalability
[junction.git] / junction / details / Linear.h
index 991f853f708f6e1a528c91ca47e5848fe9d42821..40fc450980b62cdc8212768934a9f0b725e76abd 100644 (file)
 #include <junction/SimpleJobCoordinator.h>
 #include <junction/QSBR.h>
 
+// Enable this to force migration overflows (for test purposes):
+#define JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS 0
+
 namespace junction {
 namespace details {
 
-TURF_TRACE_DECLARE(Linear, 22)
+TURF_TRACE_DECLARE(Linear, 26)
 
 template <class Map>
 struct Linear {
@@ -38,10 +41,7 @@ struct Linear {
 
     static const ureg InitialSize = 8;
     static const ureg TableMigrationUnitSize = 32;
-    static const ureg LinearSearchLimit = 128;
-    static const ureg CellsInUseSample = LinearSearchLimit;
-    TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256);              // Must fit in CellGroup::links
-    TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
+    static const ureg CellsInUseSample = 256;
 
     struct Cell {
         turf::Atomic<Hash> hash;
@@ -50,21 +50,17 @@ struct Linear {
 
     struct Table {
         const ureg sizeMask; // a power of two minus one
-        const ureg limitNumValues;
         turf::Atomic<sreg> cellsRemaining;
-        turf::Atomic<sreg> valuesRemaining;
         turf::Mutex mutex;                   // to DCLI the TableMigration (stored in the jobCoordinator)
         SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
 
-        Table(ureg sizeMask, ureg limitNumValues)
-            : sizeMask(sizeMask), limitNumValues(limitNumValues), cellsRemaining(limitNumValues),
-              valuesRemaining(limitNumValues) {
+        Table(ureg sizeMask) : sizeMask(sizeMask), cellsRemaining(sreg(sizeMask * 0.75f)) {
         }
 
-        static Table* create(ureg tableSize, ureg limitNumValues) {
+        static Table* create(ureg tableSize) {
             TURF_ASSERT(turf::util::isPowerOf2(tableSize));
             Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
-            new (table) Table(tableSize - 1, limitNumValues);
+            new (table) Table(tableSize - 1);
             for (ureg j = 0; j < tableSize; j++) {
                 table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
                 table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
@@ -88,27 +84,51 @@ struct Linear {
 
     class TableMigration : public SimpleJobCoordinator::Job {
     public:
+        struct Source {
+            Table* table;
+            turf::Atomic<ureg> sourceIndex;
+        };
+
         Map& m_map;
-        Table* m_source;
-        turf::Atomic<ureg> m_sourceIndex;
         Table* m_destination;
         turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
+        turf::Atomic<bool> m_overflowed;
         turf::Atomic<sreg> m_unitsRemaining;
+        ureg m_numSources;
+
+        TableMigration(Map& map) : m_map(map) {
+        }
 
-        TableMigration(Map& map) : m_map(map), m_sourceIndex(0), m_workerStatus(0), m_unitsRemaining(0) {
-            // Caller is responsible for filling in source & destination
+        static TableMigration* create(Map& map, ureg numSources) {
+            TableMigration* migration =
+                (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
+            new (migration) TableMigration(map);
+            migration->m_workerStatus.storeNonatomic(0);
+            migration->m_overflowed.storeNonatomic(false);
+            migration->m_unitsRemaining.storeNonatomic(0);
+            migration->m_numSources = numSources;
+            // Caller is responsible for filling in sources & destination
+            return migration;
         }
 
         virtual ~TableMigration() TURF_OVERRIDE {
-            // Destroy source table.
-            m_source->destroy();
         }
 
         void destroy() {
-            delete this;
+            // Destroy all source tables.
+            for (ureg i = 0; i < m_numSources; i++)
+                if (getSources()[i].table)
+                    getSources()[i].table->destroy();
+            // Delete the migration object itself.
+            this->TableMigration::~TableMigration();
+            TURF_HEAP.free(this);
+        }
+
+        Source* getSources() const {
+            return (Source*) (this + 1);
         }
 
-        bool migrateRange(ureg startIdx);
+        bool migrateRange(Table* srcTable, ureg startIdx);
         virtual void run() TURF_OVERRIDE;
     };
 
@@ -150,7 +170,7 @@ struct Linear {
             }
             if (probeHash == KeyTraits::NullHash) {
                 // It's an empty cell. Try to reserve it.
-                // But first, decrement cellsRemaining to ensure we have permission to create new getCells().
+                // But first, decrement cellsRemaining to ensure we have permission to create new cells.
                 s32 prevCellsRemaining = table->cellsRemaining.fetchSub(1, turf::Relaxed);
                 if (prevCellsRemaining <= 0) {
                     // Table is overpopulated.
@@ -177,71 +197,69 @@ struct Linear {
         }
     }
 
-    static void beginTableMigration(Map& map, Table* table) {
+    static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
         // Create new migration by DCLI.
-        TURF_TRACE(Linear, 8, "[beginTableMigration] called", 0, 0);
+        TURF_TRACE(Linear, 8, "[beginTableMigrationToSize] called", 0, 0);
         SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
         if (job) {
-            TURF_TRACE(Linear, 9, "[beginTableMigration] new migration already exists", 0, 0);
+            TURF_TRACE(Linear, 9, "[beginTableMigrationToSize] new migration already exists", 0, 0);
         } else {
             turf::LockGuard<turf::Mutex> guard(table->mutex);
             job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
             if (job) {
-                TURF_TRACE(Linear, 10, "[beginTableMigration] new migration already exists (double-checked)", 0, 0);
+                TURF_TRACE(Linear, 10, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
             } else {
-                // Determine new migration size and cap the number of values that can be added concurrent to the migration.
-                sreg oldValuesLimit = table->limitNumValues;
-                sreg oldValuesRemaining = table->valuesRemaining.load(turf::Relaxed);
-                sreg oldValuesInUse = oldValuesLimit - oldValuesRemaining;
-            calculateNextTableSize:
-                sreg nextTableSize = turf::util::roundUpPowerOf2(oldValuesInUse * 2);
-                sreg nextLimitNumValues = nextTableSize * 3 / 4;
-                if (nextLimitNumValues < oldValuesLimit) {
-                    // Set the new limitNumValues on the *current* table.
-                    // This prevents other threads, while the migration is in progress, from concurrently
-                    // re-inserting more values than the new table can hold.
-                    // To set the new limitNumValues on the current table in an atomic fashion,
-                    // we update its valuesRemaining via CAS loop:
-                    for (;;) {
-                        // We must recalculate desiredValuesRemaining on each iteration of the CAS loop
-                        oldValuesInUse = oldValuesLimit - oldValuesRemaining;
-                        sreg desiredValuesRemaining = nextLimitNumValues - oldValuesInUse;
-                        if (desiredValuesRemaining < 0) {
-                            TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues,
-                                       desiredValuesRemaining);
-                            // Must recalculate nextTableSize. Goto, baby!
-                            goto calculateNextTableSize;
-                        }
-                        if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed,
-                                                                       turf::Relaxed))
-                            break; // Success!
-                        // CAS failed because table->valuesRemaining was modified by another thread.
-                        // An updated value has been reloaded into oldValuesRemaining (modified by reference).
-                        // Recalculate desiredValuesRemaining to account for the updated value, and try again.
-                        TURF_TRACE(Linear, 12, "[table] valuesRemaining CAS failed", oldValuesRemaining, desiredValuesRemaining);
-                    }
-                }
-                // Now we are assured that the new table will not become overpopulated during the migration process.
                 // Create new migration.
-                TableMigration* migration = new TableMigration(map);
-                migration->m_source = table;
-                migration->m_destination = Table::create(nextTableSize, nextLimitNumValues);
+                TableMigration* migration = TableMigration::create(map, 1);
                 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
+                migration->getSources()[0].table = table;
+                migration->getSources()[0].sourceIndex.storeNonatomic(0);
+                migration->m_destination = Table::create(nextTableSize);
                 // Publish the new migration.
                 table->jobCoordinator.storeRelease(migration);
             }
         }
     }
+
+    static void beginTableMigration(Map& map, Table* table) {
+        // Estimate number of cells in use based on a small sample.
+        ureg idx = 0;
+        ureg sampleSize = turf::util::min<ureg>(table->sizeMask + 1, CellsInUseSample);
+        ureg inUseCells = 0;
+        for (; idx < sampleSize; idx++) {
+            Cell* cell = table->getCells() + idx;
+            Value value = cell->value.load(turf::Relaxed);
+            if (value == Value(ValueTraits::Redirect)) {
+                // Another thread kicked off the jobCoordinator. The caller will participate upon return.
+                TURF_TRACE(Linear, 11, "[beginTableMigration] redirected while determining table size", 0, 0);
+                return;
+            }
+            if (value != Value(ValueTraits::NullValue))
+                inUseCells++;
+        }
+        float inUseRatio = float(inUseCells) / sampleSize;
+        float estimatedInUse = (table->sizeMask + 1) * inUseRatio;
+#if JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS
+        // Periodically underestimate the number of cells in use.
+        // This exercises the code that handles overflow during migration.
+        static ureg counter = 1;
+        if ((++counter & 3) == 0) {
+            estimatedInUse /= 4;
+        }
+#endif
+        ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
+        beginTableMigrationToSize(map, table, nextTableSize);
+    }
 }; // Linear
 
 template <class Map>
-bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
-    ureg srcSizeMask = m_source->sizeMask;
+bool Linear<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
+    ureg srcSizeMask = srcTable->sizeMask;
     ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
     sreg valuesMigrated = 0;
     // Iterate over source range.
     for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
-        Cell* srcCell = m_source->getCells() + (srcIdx & srcSizeMask);
+        Cell* srcCell = srcTable->getCells() + (srcIdx & srcSizeMask);
         Hash srcHash;
         Value srcValue;
         // Fetch the srcHash and srcValue.
@@ -253,12 +271,12 @@ bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
                     srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
                 if (srcValue == Value(ValueTraits::Redirect)) {
                     // srcValue is already marked Redirect due to previous incomplete migration.
-                    TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(m_source), srcIdx);
+                    TURF_TRACE(Linear, 12, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
                     break;
                 }
                 if (srcValue == Value(ValueTraits::NullValue))
                     break; // Redirect has been placed. Break inner loop, continue outer loop.
-                TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(m_source), srcIdx);
+                TURF_TRACE(Linear, 13, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
                 // Otherwise, somebody just claimed the cell. Read srcHash again...
             } else {
                 // Check for deleted/uninitialized value.
@@ -267,21 +285,38 @@ bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
                     // Try to put a Redirect marker.
                     if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
                         break; // Redirect has been placed. Break inner loop, continue outer loop.
-                    TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(m_source), srcIdx);
+                    TURF_TRACE(Linear, 14, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
+                    if (srcValue == Value(ValueTraits::Redirect)) {
+                        // FIXME: I don't think this will happen. Investigate & change to assert
+                        TURF_TRACE(Linear, 15, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
+                        break;
+                    }
+                } else if (srcValue == Value(ValueTraits::Redirect)) {
+                    // srcValue is already marked Redirect due to previous incomplete migration.
+                    TURF_TRACE(Linear, 16, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
+                    break;
                 }
 
                 // We've got a key/value pair to migrate.
                 // Reserve a destination cell in the destination.
                 TURF_ASSERT(srcHash != KeyTraits::NullHash);
                 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
-                TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible.
+                TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
                 Cell* dstCell;
                 InsertResult result = insert(srcHash, m_destination, dstCell);
                 // During migration, a hash can only exist in one place among all the source tables,
                 // and it is only migrated by one thread. Therefore, the hash will never already exist
                 // in the destination table:
                 TURF_ASSERT(result != InsertResult_AlreadyFound);
-                TURF_ASSERT(result != InsertResult_Overflow);
+                if (result == InsertResult_Overflow) {
+                    // Destination overflow.
+                    // This can happen for several reasons. For example, the source table could have
+                    // existed of all deleted cells when it overflowed, resulting in a small destination
+                    // table size, but then another thread could re-insert all the same hashes
+                    // before the migration completed.
+                    // Caller will cancel the current migration and begin a new one.
+                    return false;
+                }
                 // Migrate the old value to the new cell.
                 for (;;) {
                     // Copy srcValue to the destination.
@@ -296,13 +331,11 @@ bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
                         // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
                         // by a late-arriving erase.
                         if (srcValue == Value(ValueTraits::NullValue))
-                            TURF_TRACE(Linear, 16, "[migrateRange] racing update was erase", uptr(m_source), srcIdx);
-                        else
-                            valuesMigrated++;
+                            TURF_TRACE(Linear, 17, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
                         break;
                     }
                     // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
-                    TURF_TRACE(Linear, 17, "[migrateRange] race to update migrated value", uptr(m_source), srcIdx);
+                    TURF_TRACE(Linear, 18, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
                     srcValue = doubleCheckedSrcValue;
                 }
                 // Cell successfully migrated. Proceed to next source cell.
@@ -310,9 +343,6 @@ bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
             }
         }
     }
-    sreg prevValuesRemaining = m_destination->valuesRemaining.fetchSub(valuesMigrated, turf::Relaxed);
-    TURF_ASSERT(valuesMigrated <= prevValuesRemaining);
-    TURF_UNUSED(prevValuesRemaining);
     // Range has been migrated successfully.
     return true;
 }
@@ -324,32 +354,55 @@ void Linear<Map>::TableMigration::run() {
     do {
         if (probeStatus & 1) {
             // End flag is already set, so do nothing.
-            TURF_TRACE(Linear, 18, "[TableMigration::run] already ended", uptr(this), 0);
+            TURF_TRACE(Linear, 19, "[TableMigration::run] already ended", uptr(this), 0);
             return;
         }
     } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
     // # of workers has been incremented, and the end flag is clear.
     TURF_ASSERT((probeStatus & 1) == 0);
 
-    // Loop over all migration units in the source table.
-    for (;;) {
-        if (m_workerStatus.load(turf::Relaxed) & 1) {
-            TURF_TRACE(Linear, 19, "[TableMigration::run] detected end flag set", uptr(this), 0);
-            goto endMigration;
-        }
-        ureg startIdx = m_sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
-        if (startIdx >= m_source->sizeMask + 1)
-            break; // No more migration units.
-        migrateRange(startIdx);
-        sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
-        TURF_ASSERT(prevRemaining > 0);
-        if (prevRemaining == 1) {
-            // That was the last chunk to migrate.
-            m_workerStatus.fetchOr(1, turf::Relaxed);
-            goto endMigration;
+    // Iterate over all source tables.
+    for (ureg s = 0; s < m_numSources; s++) {
+        Source& source = getSources()[s];
+        // Loop over all migration units in this source table.
+        for (;;) {
+            if (m_workerStatus.load(turf::Relaxed) & 1) {
+                TURF_TRACE(Linear, 20, "[TableMigration::run] detected end flag set", uptr(this), 0);
+                goto endMigration;
+            }
+            ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
+            if (startIdx >= source.table->sizeMask + 1)
+                break; // No more migration units in this table. Try next source table.
+            bool overflowed = !migrateRange(source.table, startIdx);
+            if (overflowed) {
+                // *** FAILED MIGRATION ***
+                // TableMigration failed due to destination table overflow.
+                // No other thread can declare the migration successful at this point, because *this* unit will never complete,
+                // hence m_unitsRemaining won't reach zero.
+                // However, multiple threads can independently detect a failed migration at the same time.
+                TURF_TRACE(Linear, 21, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
+                // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
+                // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
+                // the thread
+                // that deals with it.
+                bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
+                if (oldOverflowed)
+                    TURF_TRACE(Linear, 22, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
+                               uptr(oldOverflowed));
+                m_workerStatus.fetchOr(1, turf::Relaxed);
+                goto endMigration;
+            }
+            sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
+            TURF_ASSERT(prevRemaining > 0);
+            if (prevRemaining == 1) {
+                // *** SUCCESSFUL MIGRATION ***
+                // That was the last chunk to migrate.
+                m_workerStatus.fetchOr(1, turf::Relaxed);
+                goto endMigration;
+            }
         }
     }
-    TURF_TRACE(Linear, 20, "[TableMigration::run] out of migration units", uptr(this), 0);
+    TURF_TRACE(Linear, 23, "[TableMigration::run] out of migration units", uptr(this), 0);
 
 endMigration:
     // Decrement the shared # of workers.
@@ -357,16 +410,48 @@ endMigration:
         2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
     if (probeStatus >= 4) {
         // There are other workers remaining. Return here so that only the very last worker will proceed.
-        TURF_TRACE(Linear, 21, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
+        TURF_TRACE(Linear, 24, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
         return;
     }
 
     // We're the very last worker thread.
-    // Publish the new subtree.
+    // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
     TURF_ASSERT(probeStatus == 3);
-    m_map.publishTableMigration(this);
-    // End the jobCoodinator.
-    m_source->jobCoordinator.end();
+    bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
+    if (!overflowed) {
+        // The migration succeeded. This is the most likely outcome. Publish the new subtree.
+        m_map.publishTableMigration(this);
+        // End the jobCoodinator.
+        getSources()[0].table->jobCoordinator.end();
+    } else {
+        // The migration failed due to the overflow of the destination table.
+        Table* origTable = getSources()[0].table;
+        turf::LockGuard<turf::Mutex> guard(origTable->mutex);
+        SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
+        if (checkedJob != this) {
+            TURF_TRACE(Linear, 25, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
+                       uptr(checkedJob));
+        } else {
+            TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
+            // Double the destination table size.
+            migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
+            // Transfer source tables to the new migration.
+            for (ureg i = 0; i < m_numSources; i++) {
+                migration->getSources()[i].table = getSources()[i].table;
+                getSources()[i].table = NULL;
+                migration->getSources()[i].sourceIndex.storeNonatomic(0);
+            }
+            migration->getSources()[m_numSources].table = m_destination;
+            migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
+            // Calculate total number of migration units to move.
+            ureg unitsRemaining = 0;
+            for (ureg s = 0; s < migration->m_numSources; s++)
+                unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
+            migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
+            // Publish the new migration.
+            origTable->jobCoordinator.storeRelease(migration);
+        }
+    }
 
     // We're done with this TableMigration. Queue it for GC.
     DefaultQSBR.enqueue(&TableMigration::destroy, this);