Add SingleMap_Leapfrog
[junction.git] / junction / details / Leapfrog.h
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #ifndef JUNCTION_DETAILS_LEAPFROG_H
14 #define JUNCTION_DETAILS_LEAPFROG_H
15
16 #include <junction/Core.h>
17 #include <turf/Atomic.h>
18 #include <turf/Mutex.h>
19 #include <turf/ManualResetEvent.h>
20 #include <turf/Util.h>
21 #include <junction/MapTraits.h>
22 #include <turf/Trace.h>
23 #include <turf/Heap.h>
24 #include <junction/SimpleJobCoordinator.h>
25 #include <junction/QSBR.h>
26
27 namespace junction {
28 namespace details {
29
30 TURF_TRACE_DECLARE(Leapfrog, 33)
31
32 template <class Map>
33 struct Leapfrog {
34     typedef typename Map::Hash Hash;
35     typedef typename Map::Value Value;
36     typedef typename Map::KeyTraits KeyTraits;
37     typedef typename Map::ValueTraits ValueTraits;
38
39     static const ureg InitialSize = 8;
40     static const ureg TableMigrationUnitSize = 32;
41     static const ureg LinearSearchLimit = 128;
42     static const ureg CellsInUseSample = LinearSearchLimit;
43     TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256);              // Must fit in CellGroup::links
44     TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
45
46     struct Cell {
47         turf::Atomic<Hash> hash;
48         turf::Atomic<Value> value;
49     };
50
51     struct CellGroup {
52         // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain.
53         // Each cell in the probe chain is located within the table itself.
54         // "deltas" determines the index of the next cell in the probe chain.
55         // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket.
56         // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket.
57         // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket.
58         turf::Atomic<u8> deltas[8];
59         Cell cells[4];
60     };
61
62     struct Table {
63         const ureg sizeMask;                 // a power of two minus one
64         turf::Mutex mutex;                   // to DCLI the TableMigration (stored in the jobCoordinator)
65         SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
66
67         Table(ureg sizeMask) : sizeMask(sizeMask) {
68         }
69
70         static Table* create(ureg tableSize) {
71             TURF_ASSERT(turf::util::isPowerOf2(tableSize));
72             TURF_ASSERT(tableSize >= 4);
73             ureg numGroups = tableSize >> 2;
74             Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups);
75             new (table) Table(tableSize - 1);
76             for (ureg i = 0; i < numGroups; i++) {
77                 CellGroup* group = table->getCellGroups() + i;
78                 for (ureg j = 0; j < 4; j++) {
79                     group->deltas[j].storeNonatomic(0);
80                     group->deltas[j + 4].storeNonatomic(0);
81                     group->cells[j].hash.storeNonatomic(KeyTraits::NullHash);
82                     group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue));
83                 }
84             }
85             return table;
86         }
87
88         void destroy() {
89             this->Table::~Table();
90             TURF_HEAP.free(this);
91         }
92
93         CellGroup* getCellGroups() const {
94             return (CellGroup*) (this + 1);
95         }
96
97         ureg getNumMigrationUnits() const {
98             return sizeMask / TableMigrationUnitSize + 1;
99         }
100     };
101
102     class TableMigration : public SimpleJobCoordinator::Job {
103     public:
104         struct Source {
105             Table* table;
106             turf::Atomic<ureg> sourceIndex;
107         };
108
109         Map& m_map;
110         Table* m_destination;
111         turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
112         turf::Atomic<bool> m_overflowed;
113         turf::Atomic<sreg> m_unitsRemaining;
114         ureg m_numSources;
115
116         TableMigration(Map& map) : m_map(map) {
117         }
118
119         static TableMigration* create(Map& map, ureg numSources) {
120             TableMigration* migration =
121                 (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
122             new (migration) TableMigration(map);
123             migration->m_workerStatus.storeNonatomic(0);
124             migration->m_overflowed.storeNonatomic(false);
125             migration->m_unitsRemaining.storeNonatomic(0);
126             migration->m_numSources = numSources;
127             // Caller is responsible for filling in sources & destination
128             return migration;
129         }
130
131         virtual ~TableMigration() TURF_OVERRIDE {
132         }
133
134         void destroy() {
135             // Destroy all source tables.
136             for (ureg i = 0; i < m_numSources; i++)
137                 if (getSources()[i].table)
138                     getSources()[i].table->destroy();
139             // Delete the migration object itself.
140             this->TableMigration::~TableMigration();
141             TURF_HEAP.free(this);
142         }
143
144         Source* getSources() const {
145             return (Source*) (this + 1);
146         }
147
148         bool migrateRange(Table* srcTable, ureg startIdx);
149         virtual void run() TURF_OVERRIDE;
150     };
151
152     static Cell* find(Hash hash, Table* table) {
153         TURF_TRACE(Leapfrog, 0, "[find] called", uptr(table), hash);
154         TURF_ASSERT(table);
155         TURF_ASSERT(hash != KeyTraits::NullHash);
156         ureg sizeMask = table->sizeMask;
157         // Optimistically check hashed cell even though it might belong to another bucket
158         ureg idx = hash & sizeMask;
159         CellGroup* group = table->getCellGroups() + (idx >> 2);
160         Cell* cell = group->cells + (idx & 3);
161         Hash probeHash = cell->hash.load(turf::Relaxed);
162         if (probeHash == hash) {
163             TURF_TRACE(Leapfrog, 1, "[find] found existing cell optimistically", uptr(table), idx);
164             return cell;
165         } else if (probeHash == KeyTraits::NullHash) {
166             return cell = NULL;
167         }
168         // Follow probe chain for our bucket
169         u8 delta = group->deltas[idx & 3].load(turf::Relaxed);
170         while (delta) {
171             idx = (idx + delta) & sizeMask;
172             group = table->getCellGroups() + (idx >> 2);
173             cell = group->cells + (idx & 3);
174             Hash probeHash = cell->hash.load(turf::Relaxed);
175             // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert,
176             // but we don't check for it. We just follow the probe chain.
177             if (probeHash == hash) {
178                 TURF_TRACE(Leapfrog, 2, "[find] found existing cell", uptr(table), idx);
179                 return cell;
180             }
181             delta = group->deltas[(idx & 3) + 4].load(turf::Relaxed);
182         }
183         // End of probe chain, not found
184         return NULL;
185     }
186
187     // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
188     enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
189     static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell, ureg& overflowIdx) {
190         TURF_TRACE(Leapfrog, 3, "[insertOrFind] called", uptr(table), hash);
191         TURF_ASSERT(table);
192         TURF_ASSERT(hash != KeyTraits::NullHash);
193         ureg sizeMask = table->sizeMask;
194         ureg idx = ureg(hash);
195
196         // Check hashed cell first, though it may not even belong to the bucket.
197         CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
198         cell = group->cells + (idx & 3);
199         Hash probeHash = cell->hash.load(turf::Relaxed);
200         if (probeHash == KeyTraits::NullHash) {
201             if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
202                 TURF_TRACE(Leapfrog, 4, "[insertOrFind] reserved first cell", uptr(table), idx);
203                 // There are no links to set. We're done.
204                 return InsertResult_InsertedNew;
205             } else {
206                 TURF_TRACE(Leapfrog, 5, "[insertOrFind] race to reserve first cell", uptr(table), idx);
207                 // Fall through to check if it was the same hash...
208             }
209         }
210         if (probeHash == hash) {
211             TURF_TRACE(Leapfrog, 6, "[insertOrFind] found in first cell", uptr(table), idx);
212             return InsertResult_AlreadyFound;
213         }
214
215         // Follow the link chain for this bucket.
216         ureg maxIdx = idx + sizeMask;
217         ureg linkLevel = 0;
218         turf::Atomic<u8>* prevLink;
219         for (;;) {
220         followLink:
221             prevLink = group->deltas + ((idx & 3) + linkLevel);
222             linkLevel = 4;
223             u8 probeDelta = prevLink->load(turf::Relaxed);
224             if (probeDelta) {
225                 idx += probeDelta;
226                 // Check the hash for this cell.
227                 group = table->getCellGroups() + ((idx & sizeMask) >> 2);
228                 cell = group->cells + (idx & 3);
229                 probeHash = cell->hash.load(turf::Relaxed);
230                 if (probeHash == KeyTraits::NullHash) {
231                     // Cell was linked, but hash is not visible yet.
232                     // We could avoid this case (and guarantee it's visible) using acquire & release, but instead,
233                     // just poll until it becomes visible.
234                     TURF_TRACE(Leapfrog, 7, "[insertOrFind] race to read hash", uptr(table), idx);
235                     do {
236                         probeHash = cell->hash.load(turf::Acquire);
237                     } while (probeHash == KeyTraits::NullHash);
238                 }
239                 TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
240                 if (probeHash == hash) {
241                     TURF_TRACE(Leapfrog, 8, "[insertOrFind] found in probe chain", uptr(table), idx);
242                     return InsertResult_AlreadyFound;
243                 }
244             } else {
245                 // Reached the end of the link chain for this bucket.
246                 // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket.
247                 ureg prevLinkIdx = idx;
248                 TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
249                 ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit);
250                 while (linearProbesRemaining-- > 0) {
251                     idx++;
252                     group = table->getCellGroups() + ((idx & sizeMask) >> 2);
253                     cell = group->cells + (idx & 3);
254                     probeHash = cell->hash.load(turf::Relaxed);
255                     if (probeHash == KeyTraits::NullHash) {
256                         // It's an empty cell. Try to reserve it.
257                         if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
258                             // Success. We've reserved the cell. Link it to previous cell in same bucket.
259                             TURF_TRACE(Leapfrog, 9, "[insertOrFind] reserved cell", uptr(table), idx);
260                             TURF_ASSERT(probeDelta == 0);
261                             u8 desiredDelta = idx - prevLinkIdx;
262 #if TURF_WITH_ASSERTS
263                             probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
264                             TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
265 #else
266                             prevLink->store(desiredDelta, turf::Relaxed);
267 #endif
268                             return InsertResult_InsertedNew;
269                         } else {
270                             TURF_TRACE(Leapfrog, 10, "[insertOrFind] race to reserve cell", uptr(table), idx);
271                             // Fall through to check if it's the same hash...
272                         }
273                     }
274                     Hash x = (probeHash ^ hash);
275                     // Check for same hash.
276                     if (!x) {
277                         TURF_TRACE(Leapfrog, 11, "[insertOrFind] found outside probe chain", uptr(table), idx);
278                         return InsertResult_AlreadyFound;
279                     }
280                     // Check for same bucket.
281                     if ((x & sizeMask) == 0) {
282                         TURF_TRACE(Leapfrog, 12, "[insertOrFind] found late-arriving cell in same bucket", uptr(table), idx);
283                         // Attempt to set the link on behalf of the late-arriving cell.
284                         // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here,
285                         // there's no guarantee that our own link chain will be well-formed by the time this function returns.
286                         // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.)
287                         u8 desiredDelta = idx - prevLinkIdx;
288 #if TURF_WITH_ASSERTS
289                         probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
290                         TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
291                         if (probeDelta == 0)
292                             TURF_TRACE(Leapfrog, 13, "[insertOrFind] set link on behalf of late-arriving cell", uptr(table), idx);
293 #else
294                         prevLink->store(desiredDelta, turf::Relaxed);
295 #endif
296                         goto followLink; // Try to follow link chain for the bucket again.
297                     }
298                     // Continue linear search...
299                 }
300                 // Table is too full to insert.
301                 overflowIdx = idx + 1;
302                 TURF_TRACE(Leapfrog, 14, "[insertOrFind] overflow", uptr(table), overflowIdx);
303                 return InsertResult_Overflow;
304             }
305         }
306     }
307
308     static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
309         // Create new migration by DCLI.
310         TURF_TRACE(Leapfrog, 15, "[beginTableMigrationToSize] called", 0, 0);
311         SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
312         if (job) {
313             TURF_TRACE(Leapfrog, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0);
314         } else {
315             turf::LockGuard<turf::Mutex> guard(table->mutex);
316             job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
317             if (job) {
318                 TURF_TRACE(Leapfrog, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
319             } else {
320                 // Create new migration.
321                 TableMigration* migration = TableMigration::create(map, 1);
322                 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
323                 migration->getSources()[0].table = table;
324                 migration->getSources()[0].sourceIndex.storeNonatomic(0);
325                 migration->m_destination = Table::create(nextTableSize);
326                 // Publish the new migration.
327                 table->jobCoordinator.storeRelease(migration);
328             }
329         }
330     }
331
332     static void beginTableMigration(Map& map, Table* table, ureg overflowIdx) {
333         // Estimate number of cells in use based on a small sample.
334         ureg sizeMask = table->sizeMask;
335         ureg idx = overflowIdx - CellsInUseSample;
336         ureg inUseCells = 0;
337         for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
338             CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
339             Cell* cell = group->cells + (idx & 3);
340             Value value = cell->value.load(turf::Relaxed);
341             if (value == Value(ValueTraits::Redirect)) {
342                 // Another thread kicked off the jobCoordinator. The caller will participate upon return.
343                 TURF_TRACE(Leapfrog, 18, "[beginTableMigration] redirected while determining table size", 0, 0);
344                 return;
345             }
346             if (value != Value(ValueTraits::NullValue))
347                 inUseCells++;
348             idx++;
349         }
350         float inUseRatio = float(inUseCells) / CellsInUseSample;
351         float estimatedInUse = (sizeMask + 1) * inUseRatio;
352 #if JUNCTION_LEAPFROG_FORCE_MIGRATION_OVERFLOWS
353         // Periodically underestimate the number of cells in use.
354         // This exercises the code that handles overflow during migration.
355         static ureg counter = 1;
356         if ((++counter & 3) == 0) {
357             estimatedInUse /= 4;
358         }
359 #endif
360         ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
361         beginTableMigrationToSize(map, table, nextTableSize);
362     }
363 }; // Leapfrog
364
365 template <class Map>
366 bool Leapfrog<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
367     ureg srcSizeMask = srcTable->sizeMask;
368     ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
369     // Iterate over source range.
370     for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
371         CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2);
372         Cell* srcCell = srcGroup->cells + (srcIdx & 3);
373         Hash srcHash;
374         Value srcValue;
375         // Fetch the srcHash and srcValue.
376         for (;;) {
377             srcHash = srcCell->hash.load(turf::Relaxed);
378             if (srcHash == KeyTraits::NullHash) {
379                 // An unused cell. Try to put a Redirect marker in its value.
380                 srcValue =
381                     srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
382                 if (srcValue == Value(ValueTraits::Redirect)) {
383                     // srcValue is already marked Redirect due to previous incomplete migration.
384                     TURF_TRACE(Leapfrog, 19, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
385                     break;
386                 }
387                 if (srcValue == Value(ValueTraits::NullValue))
388                     break; // Redirect has been placed. Break inner loop, continue outer loop.
389                 TURF_TRACE(Leapfrog, 20, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
390                 // Otherwise, somebody just claimed the cell. Read srcHash again...
391             } else {
392                 // Check for deleted/uninitialized value.
393                 srcValue = srcCell->value.load(turf::Relaxed);
394                 if (srcValue == Value(ValueTraits::NullValue)) {
395                     // Try to put a Redirect marker.
396                     if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
397                         break; // Redirect has been placed. Break inner loop, continue outer loop.
398                     TURF_TRACE(Leapfrog, 21, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
399                     if (srcValue == Value(ValueTraits::Redirect)) {
400                         // FIXME: I don't think this will happen. Investigate & change to assert
401                         TURF_TRACE(Leapfrog, 22, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
402                         break;
403                     }
404                 } else if (srcValue == Value(ValueTraits::Redirect)) {
405                     // srcValue is already marked Redirect due to previous incomplete migration.
406                     TURF_TRACE(Leapfrog, 23, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
407                     break;
408                 }
409
410                 // We've got a key/value pair to migrate.
411                 // Reserve a destination cell in the destination.
412                 TURF_ASSERT(srcHash != KeyTraits::NullHash);
413                 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
414                 TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
415                 Cell* dstCell;
416                 ureg overflowIdx;
417                 InsertResult result = insertOrFind(srcHash, m_destination, dstCell, overflowIdx);
418                 // During migration, a hash can only exist in one place among all the source tables,
419                 // and it is only migrated by one thread. Therefore, the hash will never already exist
420                 // in the destination table:
421                 TURF_ASSERT(result != InsertResult_AlreadyFound);
422                 if (result == InsertResult_Overflow) {
423                     // Destination overflow.
424                     // This can happen for several reasons. For example, the source table could have
425                     // existed of all deleted cells when it overflowed, resulting in a small destination
426                     // table size, but then another thread could re-insert all the same hashes
427                     // before the migration completed.
428                     // Caller will cancel the current migration and begin a new one.
429                     return false;
430                 }
431                 // Migrate the old value to the new cell.
432                 for (;;) {
433                     // Copy srcValue to the destination.
434                     dstCell->value.store(srcValue, turf::Relaxed);
435                     // Try to place a Redirect marker in srcValue.
436                     Value doubleCheckedSrcValue =
437                         srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
438                     TURF_ASSERT(doubleCheckedSrcValue !=
439                                 Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
440                     if (doubleCheckedSrcValue == srcValue) {
441                         // No racing writes to the src. We've successfully placed the Redirect marker.
442                         // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
443                         // by a late-arriving erase.
444                         if (srcValue == Value(ValueTraits::NullValue))
445                             TURF_TRACE(Leapfrog, 24, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
446                         break;
447                     }
448                     // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
449                     TURF_TRACE(Leapfrog, 25, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
450                     srcValue = doubleCheckedSrcValue;
451                 }
452                 // Cell successfully migrated. Proceed to next source cell.
453                 break;
454             }
455         }
456     }
457     // Range has been migrated successfully.
458     return true;
459 }
460
461 template <class Map>
462 void Leapfrog<Map>::TableMigration::run() {
463     // Conditionally increment the shared # of workers.
464     ureg probeStatus = m_workerStatus.load(turf::Relaxed);
465     do {
466         if (probeStatus & 1) {
467             // End flag is already set, so do nothing.
468             TURF_TRACE(Leapfrog, 26, "[TableMigration::run] already ended", uptr(this), 0);
469             return;
470         }
471     } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
472     // # of workers has been incremented, and the end flag is clear.
473     TURF_ASSERT((probeStatus & 1) == 0);
474
475     // Iterate over all source tables.
476     for (ureg s = 0; s < m_numSources; s++) {
477         Source& source = getSources()[s];
478         // Loop over all migration units in this source table.
479         for (;;) {
480             if (m_workerStatus.load(turf::Relaxed) & 1) {
481                 TURF_TRACE(Leapfrog, 27, "[TableMigration::run] detected end flag set", uptr(this), 0);
482                 goto endMigration;
483             }
484             ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
485             if (startIdx >= source.table->sizeMask + 1)
486                 break; // No more migration units in this table. Try next source table.
487             bool overflowed = !migrateRange(source.table, startIdx);
488             if (overflowed) {
489                 // *** FAILED MIGRATION ***
490                 // TableMigration failed due to destination table overflow.
491                 // No other thread can declare the migration successful at this point, because *this* unit will never complete,
492                 // hence m_unitsRemaining won't reach zero.
493                 // However, multiple threads can independently detect a failed migration at the same time.
494                 TURF_TRACE(Leapfrog, 28, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
495                 // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
496                 // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
497                 // the thread
498                 // that deals with it.
499                 bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
500                 if (oldOverflowed)
501                     TURF_TRACE(Leapfrog, 29, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
502                                uptr(oldOverflowed));
503                 m_workerStatus.fetchOr(1, turf::Relaxed);
504                 goto endMigration;
505             }
506             sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
507             TURF_ASSERT(prevRemaining > 0);
508             if (prevRemaining == 1) {
509                 // *** SUCCESSFUL MIGRATION ***
510                 // That was the last chunk to migrate.
511                 m_workerStatus.fetchOr(1, turf::Relaxed);
512                 goto endMigration;
513             }
514         }
515     }
516     TURF_TRACE(Leapfrog, 30, "[TableMigration::run] out of migration units", uptr(this), 0);
517
518 endMigration:
519     // Decrement the shared # of workers.
520     probeStatus = m_workerStatus.fetchSub(
521         2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
522     if (probeStatus >= 4) {
523         // There are other workers remaining. Return here so that only the very last worker will proceed.
524         TURF_TRACE(Leapfrog, 31, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
525         return;
526     }
527
528     // We're the very last worker thread.
529     // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
530     TURF_ASSERT(probeStatus == 3);
531     bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
532     if (!overflowed) {
533         // The migration succeeded. This is the most likely outcome. Publish the new subtree.
534         m_map.publishTableMigration(this);
535         // End the jobCoodinator.
536         getSources()[0].table->jobCoordinator.end();
537     } else {
538         // The migration failed due to the overflow of the destination table.
539         Table* origTable = getSources()[0].table;
540         turf::LockGuard<turf::Mutex> guard(origTable->mutex);
541         SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
542         if (checkedJob != this) {
543             TURF_TRACE(Leapfrog, 32, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
544                        uptr(checkedJob));
545         } else {
546             TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
547             // Double the destination table size.
548             migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
549             // Transfer source tables to the new migration.
550             for (ureg i = 0; i < m_numSources; i++) {
551                 migration->getSources()[i].table = getSources()[i].table;
552                 getSources()[i].table = NULL;
553                 migration->getSources()[i].sourceIndex.storeNonatomic(0);
554             }
555             migration->getSources()[m_numSources].table = m_destination;
556             migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
557             // Calculate total number of migration units to move.
558             ureg unitsRemaining = 0;
559             for (ureg s = 0; s < migration->m_numSources; s++)
560                 unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
561             migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
562             // Publish the new migration.
563             origTable->jobCoordinator.storeRelease(migration);
564         }
565     }
566
567     // We're done with this TableMigration. Queue it for GC.
568     DefaultQSBR.enqueue(&TableMigration::destroy, this);
569 }
570
571 } // namespace details
572 } // namespace junction
573
574 #endif // JUNCTION_DETAILS_LEAPFROG_H