Improve ConcurrentMap_Linear scalability
[junction.git] / junction / details / LeapFrog.h
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #ifndef JUNCTION_DETAILS_LEAPFROG_H
14 #define JUNCTION_DETAILS_LEAPFROG_H
15
16 #include <junction/Core.h>
17 #include <turf/Atomic.h>
18 #include <turf/Mutex.h>
19 #include <turf/ManualResetEvent.h>
20 #include <turf/Util.h>
21 #include <junction/MapTraits.h>
22 #include <turf/Trace.h>
23 #include <turf/Heap.h>
24 #include <junction/SimpleJobCoordinator.h>
25 #include <junction/QSBR.h>
26
27 // Enable this to force migration overflows (for test purposes):
28 #define JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS 0
29
30 namespace junction {
31 namespace details {
32
33 TURF_TRACE_DECLARE(LeapFrog, 33)
34
35 template <class Map>
36 struct LeapFrog {
37     typedef typename Map::Hash Hash;
38     typedef typename Map::Value Value;
39     typedef typename Map::KeyTraits KeyTraits;
40     typedef typename Map::ValueTraits ValueTraits;
41
42     static const ureg InitialSize = 8;
43     static const ureg TableMigrationUnitSize = 32;
44     static const ureg LinearSearchLimit = 128;
45     static const ureg CellsInUseSample = LinearSearchLimit;
46     TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256);              // Must fit in CellGroup::links
47     TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
48
49     struct Cell {
50         turf::Atomic<Hash> hash;
51         turf::Atomic<Value> value;
52     };
53
54     struct CellGroup {
55         // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain.
56         // Each cell in the probe chain is located within the table itself.
57         // "deltas" determines the index of the next cell in the probe chain.
58         // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket.
59         // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket.
60         // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket.
61         turf::Atomic<u8> deltas[8];
62         Cell cells[4];
63     };
64
65     struct Table {
66         const ureg sizeMask;                 // a power of two minus one
67         turf::Mutex mutex;                   // to DCLI the TableMigration (stored in the jobCoordinator)
68         SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
69
70         Table(ureg sizeMask) : sizeMask(sizeMask) {
71         }
72
73         static Table* create(ureg tableSize) {
74             TURF_ASSERT(turf::util::isPowerOf2(tableSize));
75             TURF_ASSERT(tableSize >= 4);
76             ureg numGroups = tableSize >> 2;
77             Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups);
78             new (table) Table(tableSize - 1);
79             for (ureg i = 0; i < numGroups; i++) {
80                 CellGroup* group = table->getCellGroups() + i;
81                 for (ureg j = 0; j < 4; j++) {
82                     group->deltas[j].storeNonatomic(0);
83                     group->deltas[j + 4].storeNonatomic(0);
84                     group->cells[j].hash.storeNonatomic(KeyTraits::NullHash);
85                     group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue));
86                 }
87             }
88             return table;
89         }
90
91         void destroy() {
92             this->Table::~Table();
93             TURF_HEAP.free(this);
94         }
95
96         CellGroup* getCellGroups() const {
97             return (CellGroup*) (this + 1);
98         }
99
100         ureg getNumMigrationUnits() const {
101             return sizeMask / TableMigrationUnitSize + 1;
102         }
103     };
104
105     class TableMigration : public SimpleJobCoordinator::Job {
106     public:
107         struct Source {
108             Table* table;
109             turf::Atomic<ureg> sourceIndex;
110         };
111
112         Map& m_map;
113         Table* m_destination;
114         turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
115         turf::Atomic<bool> m_overflowed;
116         turf::Atomic<sreg> m_unitsRemaining;
117         ureg m_numSources;
118
119         TableMigration(Map& map) : m_map(map) {
120         }
121
122         static TableMigration* create(Map& map, ureg numSources) {
123             TableMigration* migration =
124                 (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
125             new (migration) TableMigration(map);
126             migration->m_workerStatus.storeNonatomic(0);
127             migration->m_overflowed.storeNonatomic(false);
128             migration->m_unitsRemaining.storeNonatomic(0);
129             migration->m_numSources = numSources;
130             // Caller is responsible for filling in sources & destination
131             return migration;
132         }
133
134         virtual ~TableMigration() TURF_OVERRIDE {
135         }
136
137         void destroy() {
138             // Destroy all source tables.
139             for (ureg i = 0; i < m_numSources; i++)
140                 if (getSources()[i].table)
141                     getSources()[i].table->destroy();
142             // Delete the migration object itself.
143             this->TableMigration::~TableMigration();
144             TURF_HEAP.free(this);
145         }
146
147         Source* getSources() const {
148             return (Source*) (this + 1);
149         }
150
151         bool migrateRange(Table* srcTable, ureg startIdx);
152         virtual void run() TURF_OVERRIDE;
153     };
154
155     static Cell* find(Hash hash, Table* table) {
156         TURF_TRACE(LeapFrog, 0, "[find] called", uptr(table), hash);
157         TURF_ASSERT(table);
158         TURF_ASSERT(hash != KeyTraits::NullHash);
159         ureg sizeMask = table->sizeMask;
160         // Optimistically check hashed cell even though it might belong to another bucket
161         ureg idx = hash & sizeMask;
162         CellGroup* group = table->getCellGroups() + (idx >> 2);
163         Cell* cell = group->cells + (idx & 3);
164         Hash probeHash = cell->hash.load(turf::Relaxed);
165         if (probeHash == hash) {
166             TURF_TRACE(LeapFrog, 1, "[find] found existing cell optimistically", uptr(table), idx);
167             return cell;
168         } else if (probeHash == KeyTraits::NullHash) {
169             return cell = NULL;
170         }
171         // Follow probe chain for our bucket
172         u8 delta = group->deltas[idx & 3].load(turf::Relaxed);
173         while (delta) {
174             idx = (idx + delta) & sizeMask;
175             group = table->getCellGroups() + (idx >> 2);
176             cell = group->cells + (idx & 3);
177             Hash probeHash = cell->hash.load(turf::Relaxed);
178             // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert,
179             // but we don't check for it. We just follow the probe chain.
180             if (probeHash == hash) {
181                 TURF_TRACE(LeapFrog, 2, "[find] found existing cell", uptr(table), idx);
182                 return cell;
183             }
184             delta = group->deltas[(idx & 3) + 4].load(turf::Relaxed);
185         }
186         // End of probe chain, not found
187         return NULL;
188     }
189
190     // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
191     enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
192     static InsertResult insert(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) {
193         TURF_TRACE(LeapFrog, 3, "[insert] called", uptr(table), hash);
194         TURF_ASSERT(table);
195         TURF_ASSERT(hash != KeyTraits::NullHash);
196         ureg sizeMask = table->sizeMask;
197         ureg idx = ureg(hash);
198
199         // Check hashed cell first, though it may not even belong to the bucket.
200         CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
201         cell = group->cells + (idx & 3);
202         Hash probeHash = cell->hash.load(turf::Relaxed);
203         if (probeHash == KeyTraits::NullHash) {
204             if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
205                 TURF_TRACE(LeapFrog, 4, "[insert] reserved first cell", uptr(table), idx);
206                 // There are no links to set. We're done.
207                 return InsertResult_InsertedNew;
208             } else {
209                 TURF_TRACE(LeapFrog, 5, "[insert] race to reserve first cell", uptr(table), idx);
210                 // Fall through to check if it was the same hash...
211             }
212         }
213         if (probeHash == hash) {
214             TURF_TRACE(LeapFrog, 6, "[insert] found in first cell", uptr(table), idx);
215             return InsertResult_AlreadyFound;
216         }
217
218         // Follow the link chain for this bucket.
219         ureg maxIdx = idx + sizeMask;
220         ureg linkLevel = 0;
221         turf::Atomic<u8>* prevLink;
222         for (;;) {
223         followLink:
224             prevLink = group->deltas + ((idx & 3) + linkLevel);
225             linkLevel = 4;
226             u8 probeDelta = prevLink->load(turf::Relaxed);
227             if (probeDelta) {
228                 idx += probeDelta;
229                 // Check the hash for this cell.
230                 group = table->getCellGroups() + ((idx & sizeMask) >> 2);
231                 cell = group->cells + (idx & 3);
232                 probeHash = cell->hash.load(turf::Relaxed);
233                 if (probeHash == KeyTraits::NullHash) {
234                     // Cell was linked, but hash is not visible yet.
235                     // We could avoid this case (and guarantee it's visible) using acquire & release, but instead,
236                     // just poll until it becomes visible.
237                     TURF_TRACE(LeapFrog, 7, "[insert] race to read hash", uptr(table), idx);
238                     do {
239                         probeHash = cell->hash.load(turf::Acquire);
240                     } while (probeHash == KeyTraits::NullHash);
241                 }
242                 TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
243                 if (probeHash == hash) {
244                     TURF_TRACE(LeapFrog, 8, "[insert] found in probe chain", uptr(table), idx);
245                     return InsertResult_AlreadyFound;
246                 }
247             } else {
248                 // Reached the end of the link chain for this bucket.
249                 // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket.
250                 ureg prevLinkIdx = idx;
251                 TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
252                 ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit);
253                 while (linearProbesRemaining-- > 0) {
254                     idx++;
255                     group = table->getCellGroups() + ((idx & sizeMask) >> 2);
256                     cell = group->cells + (idx & 3);
257                     probeHash = cell->hash.load(turf::Relaxed);
258                     if (probeHash == KeyTraits::NullHash) {
259                         // It's an empty cell. Try to reserve it.
260                         if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
261                             // Success. We've reserved the cell. Link it to previous cell in same bucket.
262                             TURF_TRACE(LeapFrog, 9, "[insert] reserved cell", uptr(table), idx);
263                             TURF_ASSERT(probeDelta == 0);
264                             u8 desiredDelta = idx - prevLinkIdx;
265 #if TURF_WITH_ASSERTS
266                             probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
267                             TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
268 #else
269                             prevLink->store(desiredDelta, turf::Relaxed);
270 #endif
271                             return InsertResult_InsertedNew;
272                         } else {
273                             TURF_TRACE(LeapFrog, 10, "[insert] race to reserve cell", uptr(table), idx);
274                             // Fall through to check if it's the same hash...
275                         }
276                     }
277                     Hash x = (probeHash ^ hash);
278                     // Check for same hash.
279                     if (!x) {
280                         TURF_TRACE(LeapFrog, 11, "[insert] found outside probe chain", uptr(table), idx);
281                         return InsertResult_AlreadyFound;
282                     }
283                     // Check for same bucket.
284                     if ((x & sizeMask) == 0) {
285                         TURF_TRACE(LeapFrog, 12, "[insert] found late-arriving cell in same bucket", uptr(table), idx);
286                         // Attempt to set the link on behalf of the late-arriving cell.
287                         // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here,
288                         // there's no guarantee that our own link chain will be well-formed by the time this function returns.
289                         // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.)
290                         u8 desiredDelta = idx - prevLinkIdx;
291 #if TURF_WITH_ASSERTS
292                         probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
293                         TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
294                         if (probeDelta == 0)
295                             TURF_TRACE(LeapFrog, 13, "[insert] set link on behalf of late-arriving cell", uptr(table), idx);
296 #else
297                         prevLink->store(desiredDelta, turf::Relaxed);
298 #endif
299                         goto followLink; // Try to follow link chain for the bucket again.
300                     }
301                     // Continue linear search...
302                 }
303                 // Table is too full to insert.
304                 overflowIdx = idx + 1;
305                 TURF_TRACE(LeapFrog, 14, "[insert] overflow", uptr(table), overflowIdx);
306                 return InsertResult_Overflow;
307             }
308         }
309     }
310
311     static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
312         // Create new migration by DCLI.
313         TURF_TRACE(LeapFrog, 15, "[beginTableMigrationToSize] called", 0, 0);
314         SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
315         if (job) {
316             TURF_TRACE(LeapFrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0);
317         } else {
318             turf::LockGuard<turf::Mutex> guard(table->mutex);
319             job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
320             if (job) {
321                 TURF_TRACE(LeapFrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
322             } else {
323                 // Create new migration.
324                 TableMigration* migration = TableMigration::create(map, 1);
325                 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
326                 migration->getSources()[0].table = table;
327                 migration->getSources()[0].sourceIndex.storeNonatomic(0);
328                 migration->m_destination = Table::create(nextTableSize);
329                 // Publish the new migration.
330                 table->jobCoordinator.storeRelease(migration);
331             }
332         }
333     }
334
335     static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) {
336         // Estimate number of cells in use based on a small sample.
337         ureg sizeMask = table->sizeMask;
338         ureg idx = overflowIdx - CellsInUseSample;
339         ureg inUseCells = 0;
340         for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
341             CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
342             Cell* cell = group->cells + (idx & 3);
343             Value value = cell->value.load(turf::Relaxed);
344             if (value == Value(ValueTraits::Redirect)) {
345                 // Another thread kicked off the jobCoordinator. The caller will participate upon return.
346                 TURF_TRACE(LeapFrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0);
347                 return;
348             }
349             if (value != Value(ValueTraits::NullValue))
350                 inUseCells++;
351             idx++;
352         }
353         float inUseRatio = float(inUseCells) / CellsInUseSample;
354         float estimatedInUse = (sizeMask + 1) * inUseRatio;
355 #if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS
356         // Periodically underestimate the number of cells in use.
357         // This exercises the code that handles overflow during migration.
358         static ureg counter = 1;
359         if ((++counter & 3) == 0) {
360             estimatedInUse /= 4;
361         }
362 #endif
363         ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
364         beginTableMigrationToSize(map, table, nextTableSize);
365     }
366 }; // LeapFrog
367
368 template <class Map>
369 bool LeapFrog<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
370     ureg srcSizeMask = srcTable->sizeMask;
371     ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
372     // Iterate over source range.
373     for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
374         CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2);
375         Cell* srcCell = srcGroup->cells + (srcIdx & 3);
376         Hash srcHash;
377         Value srcValue;
378         // Fetch the srcHash and srcValue.
379         for (;;) {
380             srcHash = srcCell->hash.load(turf::Relaxed);
381             if (srcHash == KeyTraits::NullHash) {
382                 // An unused cell. Try to put a Redirect marker in its value.
383                 srcValue =
384                     srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
385                 if (srcValue == Value(ValueTraits::Redirect)) {
386                     // srcValue is already marked Redirect due to previous incomplete migration.
387                     TURF_TRACE(LeapFrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
388                     break;
389                 }
390                 if (srcValue == Value(ValueTraits::NullValue))
391                     break; // Redirect has been placed. Break inner loop, continue outer loop.
392                 TURF_TRACE(LeapFrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
393                 // Otherwise, somebody just claimed the cell. Read srcHash again...
394             } else {
395                 // Check for deleted/uninitialized value.
396                 srcValue = srcCell->value.load(turf::Relaxed);
397                 if (srcValue == Value(ValueTraits::NullValue)) {
398                     // Try to put a Redirect marker.
399                     if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
400                         break; // Redirect has been placed. Break inner loop, continue outer loop.
401                     TURF_TRACE(LeapFrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
402                     if (srcValue == Value(ValueTraits::Redirect)) {
403                         // FIXME: I don't think this will happen. Investigate & change to assert
404                         TURF_TRACE(LeapFrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
405                         break;
406                     }
407                 } else if (srcValue == Value(ValueTraits::Redirect)) {
408                     // srcValue is already marked Redirect due to previous incomplete migration.
409                     TURF_TRACE(LeapFrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
410                     break;
411                 }
412
413                 // We've got a key/value pair to migrate.
414                 // Reserve a destination cell in the destination.
415                 TURF_ASSERT(srcHash != KeyTraits::NullHash);
416                 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
417                 TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
418                 Cell* dstCell;
419                 ureg overflowIdx;
420                 InsertResult result = insert(srcHash, m_destination, dstCell, overflowIdx);
421                 // During migration, a hash can only exist in one place among all the source tables,
422                 // and it is only migrated by one thread. Therefore, the hash will never already exist
423                 // in the destination table:
424                 TURF_ASSERT(result != InsertResult_AlreadyFound);
425                 if (result == InsertResult_Overflow) {
426                     // Destination overflow.
427                     // This can happen for several reasons. For example, the source table could have
428                     // existed of all deleted cells when it overflowed, resulting in a small destination
429                     // table size, but then another thread could re-insert all the same hashes
430                     // before the migration completed.
431                     // Caller will cancel the current migration and begin a new one.
432                     return false;
433                 }
434                 // Migrate the old value to the new cell.
435                 for (;;) {
436                     // Copy srcValue to the destination.
437                     dstCell->value.store(srcValue, turf::Relaxed);
438                     // Try to place a Redirect marker in srcValue.
439                     Value doubleCheckedSrcValue =
440                         srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
441                     TURF_ASSERT(doubleCheckedSrcValue !=
442                                 Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
443                     if (doubleCheckedSrcValue == srcValue) {
444                         // No racing writes to the src. We've successfully placed the Redirect marker.
445                         // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
446                         // by a late-arriving erase.
447                         if (srcValue == Value(ValueTraits::NullValue))
448                             TURF_TRACE(LeapFrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
449                         break;
450                     }
451                     // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
452                     TURF_TRACE(LeapFrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
453                     srcValue = doubleCheckedSrcValue;
454                 }
455                 // Cell successfully migrated. Proceed to next source cell.
456                 break;
457             }
458         }
459     }
460     // Range has been migrated successfully.
461     return true;
462 }
463
464 template <class Map>
465 void LeapFrog<Map>::TableMigration::run() {
466     // Conditionally increment the shared # of workers.
467     ureg probeStatus = m_workerStatus.load(turf::Relaxed);
468     do {
469         if (probeStatus & 1) {
470             // End flag is already set, so do nothing.
471             TURF_TRACE(LeapFrog, 26, "[TableMigration::run] already ended", uptr(this), 0);
472             return;
473         }
474     } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
475     // # of workers has been incremented, and the end flag is clear.
476     TURF_ASSERT((probeStatus & 1) == 0);
477
478     // Iterate over all source tables.
479     for (ureg s = 0; s < m_numSources; s++) {
480         Source& source = getSources()[s];
481         // Loop over all migration units in this source table.
482         for (;;) {
483             if (m_workerStatus.load(turf::Relaxed) & 1) {
484                 TURF_TRACE(LeapFrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0);
485                 goto endMigration;
486             }
487             ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
488             if (startIdx >= source.table->sizeMask + 1)
489                 break; // No more migration units in this table. Try next source table.
490             bool overflowed = !migrateRange(source.table, startIdx);
491             if (overflowed) {
492                 // *** FAILED MIGRATION ***
493                 // TableMigration failed due to destination table overflow.
494                 // No other thread can declare the migration successful at this point, because *this* unit will never complete,
495                 // hence m_unitsRemaining won't reach zero.
496                 // However, multiple threads can independently detect a failed migration at the same time.
497                 TURF_TRACE(LeapFrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
498                 // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
499                 // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
500                 // the thread
501                 // that deals with it.
502                 bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
503                 if (oldOverflowed)
504                     TURF_TRACE(LeapFrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
505                                uptr(oldOverflowed));
506                 m_workerStatus.fetchOr(1, turf::Relaxed);
507                 goto endMigration;
508             }
509             sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
510             TURF_ASSERT(prevRemaining > 0);
511             if (prevRemaining == 1) {
512                 // *** SUCCESSFUL MIGRATION ***
513                 // That was the last chunk to migrate.
514                 m_workerStatus.fetchOr(1, turf::Relaxed);
515                 goto endMigration;
516             }
517         }
518     }
519     TURF_TRACE(LeapFrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0);
520
521 endMigration:
522     // Decrement the shared # of workers.
523     probeStatus = m_workerStatus.fetchSub(
524         2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
525     if (probeStatus >= 4) {
526         // There are other workers remaining. Return here so that only the very last worker will proceed.
527         TURF_TRACE(LeapFrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
528         return;
529     }
530
531     // We're the very last worker thread.
532     // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
533     TURF_ASSERT(probeStatus == 3);
534     bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
535     if (!overflowed) {
536         // The migration succeeded. This is the most likely outcome. Publish the new subtree.
537         m_map.publishTableMigration(this);
538         // End the jobCoodinator.
539         getSources()[0].table->jobCoordinator.end();
540     } else {
541         // The migration failed due to the overflow of the destination table.
542         Table* origTable = getSources()[0].table;
543         turf::LockGuard<turf::Mutex> guard(origTable->mutex);
544         SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
545         if (checkedJob != this) {
546             TURF_TRACE(LeapFrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
547                        uptr(checkedJob));
548         } else {
549             TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
550             // Double the destination table size.
551             migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
552             // Transfer source tables to the new migration.
553             for (ureg i = 0; i < m_numSources; i++) {
554                 migration->getSources()[i].table = getSources()[i].table;
555                 getSources()[i].table = NULL;
556                 migration->getSources()[i].sourceIndex.storeNonatomic(0);
557             }
558             migration->getSources()[m_numSources].table = m_destination;
559             migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
560             // Calculate total number of migration units to move.
561             ureg unitsRemaining = 0;
562             for (ureg s = 0; s < migration->m_numSources; s++)
563                 unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
564             migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
565             // Publish the new migration.
566             origTable->jobCoordinator.storeRelease(migration);
567         }
568     }
569
570     // We're done with this TableMigration. Queue it for GC.
571     DefaultQSBR.enqueue(&TableMigration::destroy, this);
572 }
573
574 } // namespace details
575 } // namespace junction
576
577 #endif // JUNCTION_DETAILS_LEAPFROG_H