- // Loop over all migration units in the source table.
- for (;;) {
- if (m_workerStatus.load(turf::Relaxed) & 1) {
- TURF_TRACE(Linear, 19, "[TableMigration::run] detected end flag set", uptr(this), 0);
- goto endMigration;
- }
- ureg startIdx = m_sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
- if (startIdx >= m_source->sizeMask + 1)
- break; // No more migration units.
- migrateRange(startIdx);
- sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
- TURF_ASSERT(prevRemaining > 0);
- if (prevRemaining == 1) {
- // That was the last chunk to migrate.
- m_workerStatus.fetchOr(1, turf::Relaxed);
- goto endMigration;
+ // Iterate over all source tables.
+ for (ureg s = 0; s < m_numSources; s++) {
+ Source& source = getSources()[s];
+ // Loop over all migration units in this source table.
+ for (;;) {
+ if (m_workerStatus.load(turf::Relaxed) & 1) {
+ TURF_TRACE(Linear, 20, "[TableMigration::run] detected end flag set", uptr(this), 0);
+ goto endMigration;
+ }
+ ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
+ if (startIdx >= source.table->sizeMask + 1)
+ break; // No more migration units in this table. Try next source table.
+ bool overflowed = !migrateRange(source.table, startIdx);
+ if (overflowed) {
+ // *** FAILED MIGRATION ***
+ // TableMigration failed due to destination table overflow.
+ // No other thread can declare the migration successful at this point, because *this* unit will never complete,
+ // hence m_unitsRemaining won't reach zero.
+ // However, multiple threads can independently detect a failed migration at the same time.
+ TURF_TRACE(Linear, 21, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
+ // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
+ // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
+ // the thread
+ // that deals with it.
+ bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
+ if (oldOverflowed)
+ TURF_TRACE(Linear, 22, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
+ uptr(oldOverflowed));
+ m_workerStatus.fetchOr(1, turf::Relaxed);
+ goto endMigration;
+ }
+ sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
+ TURF_ASSERT(prevRemaining > 0);
+ if (prevRemaining == 1) {
+ // *** SUCCESSFUL MIGRATION ***
+ // That was the last chunk to migrate.
+ m_workerStatus.fetchOr(1, turf::Relaxed);
+ goto endMigration;
+ }