de7a7b91e70bc03f69014e2b90a1e446fa9d40d6
[junction.git] / junction / details / Grampa.h
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #ifndef JUNCTION_DETAILS_GRAMPA_H
14 #define JUNCTION_DETAILS_GRAMPA_H
15
16 #include <junction/Core.h>
17 #include <turf/Atomic.h>
18 #include <junction/striped/Mutex.h>
19 #include <junction/striped/ManualResetEvent.h>
20 #include <turf/Util.h>
21 #include <junction/MapTraits.h>
22 #include <turf/Trace.h>
23 #include <turf/Heap.h>
24 #include <junction/SimpleJobCoordinator.h>
25 #include <junction/QSBR.h>
26 #include <memory.h>
27
28 namespace junction {
29 namespace details {
30
31 #if JUNCTION_TRACK_GRAMPA_STATS
32 struct GrampaCounter {
33     turf::Atomic<ureg> total;
34     turf::Atomic<sreg> current;
35
36     void increment() {
37         total.fetchAdd(1, turf::Relaxed);
38         current.fetchAdd(1, turf::Relaxed);
39     }
40
41     void decrement() {
42         current.fetchSub(1, turf::Relaxed);
43     }
44 };
45
46 struct GrampaStats {
47     GrampaCounter numTables;
48     GrampaCounter numTableMigrations;
49     GrampaCounter numFlatTrees;
50     GrampaCounter numFlatTreeMigrations;
51
52     static GrampaStats Instance; // Zero-initialized
53 };
54 #endif
55
56 TURF_TRACE_DECLARE(Grampa, 38)
57
58 template <class Map>
59 struct Grampa {
60     typedef typename Map::Hash Hash;
61     typedef typename Map::Value Value;
62     typedef typename Map::KeyTraits KeyTraits;
63     typedef typename Map::ValueTraits ValueTraits;
64
65     static const ureg RedirectFlatTree = 1;
66     static const ureg InitialSize = 8;
67     static const ureg TableMigrationUnitSize = 32;
68     static const ureg FlatTreeMigrationUnitSize = 32;
69     static const ureg LinearSearchLimit = 128;
70     static const ureg CellsInUseSample = LinearSearchLimit;
71     TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256);              // Must fit in CellGroup::links
72     TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
73
74     static const ureg MinTableSize = 8;
75     static const ureg LeafSizeBits = 10;
76     static const ureg LeafSize = (ureg(1) << LeafSizeBits);
77
78     struct Cell {
79         // If value == Redirect, threads participate in the jobCoordinator.
80         turf::Atomic<Hash> hash;
81         turf::Atomic<Value> value;
82     };
83
84     struct CellGroup {
85         // Every cell in the table actually represents a bucket of cells, all linked together in a probe chain.
86         // Each cell in the probe chain is located within the table itself.
87         // "deltas" determines the index of the next cell in the probe chain.
88         // The first cell in the chain is the one that was hashed. It may or may not actually belong in the bucket.
89         // The "second" cell in the chain is given by deltas 0 - 3. It's guaranteed to belong in the bucket.
90         // All subsequent cells in the chain is given by deltas 4 - 7. Also guaranteed to belong in the bucket.
91         turf::Atomic<u8> deltas[8];
92         Cell cells[4];
93     };
94
95     struct Table {
96         // unsafeRangeShift determines how many slots are occupied by this Table in the flattree.
97         // The range of hashes stored in this table is given by (1 << shift).
98         // eg. If the entire map is stored in a single table, then Table::shift == HASH_BITS.
99         // If the entire map is stored in two tables, then Table::shift == (HASH_BITS - 1) for each table.
100         // FlatTree::shift is always <= Table::shift for all the tables it contains.
101         const ureg sizeMask; // a power of two minus one
102         const Hash baseHash;
103         const ureg unsafeRangeShift;
104         junction::striped::ManualResetEvent
105             isPublished;                // To prevent publishing a subtree before its parent is published (happened in testing)
106         junction::striped::Mutex mutex; // to DCLI the TableMigration (stored in the jobCoordinator)
107         SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
108
109         Table(ureg sizeMask, Hash baseHash, ureg unsafeRangeShift)
110             : sizeMask(sizeMask), baseHash(baseHash), unsafeRangeShift(unsafeRangeShift) {
111         }
112
113         static Table* create(ureg tableSize, Hash baseHash, ureg unsafeShift) {
114             TURF_ASSERT(turf::util::isPowerOf2(tableSize));
115             TURF_ASSERT(unsafeShift > 0 && unsafeShift <= sizeof(Hash) * 8);
116             TURF_ASSERT(tableSize >= 4);
117             ureg numGroups = tableSize >> 2;
118             Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(CellGroup) * numGroups);
119             new (table) Table(tableSize - 1, baseHash, (u8) unsafeShift);
120             for (ureg i = 0; i < numGroups; i++) {
121                 CellGroup* group = table->getCellGroups() + i;
122                 for (ureg j = 0; j < 4; j++) {
123                     group->deltas[j].storeNonatomic(0);
124                     group->deltas[j + 4].storeNonatomic(0);
125                     group->cells[j].hash.storeNonatomic(KeyTraits::NullHash);
126                     group->cells[j].value.storeNonatomic(Value(ValueTraits::NullValue));
127                 }
128             }
129 #if JUNCTION_TRACK_GRAMPA_STATS
130             GrampaStats::Instance.numTables.increment();
131 #endif
132             return table;
133         }
134
135         void destroy() {
136 #if JUNCTION_TRACK_GRAMPA_STATS
137             GrampaStats::Instance.numTables.decrement();
138 #endif
139             this->Table::~Table();
140             TURF_HEAP.free(this);
141         }
142
143         CellGroup* getCellGroups() const {
144             return (CellGroup*) (this + 1);
145         }
146
147         ureg getNumMigrationUnits() const {
148             return sizeMask / TableMigrationUnitSize + 1;
149         }
150     };
151
152     class TableMigration : public SimpleJobCoordinator::Job {
153     public:
154         struct Source {
155             Table* table;
156             turf::Atomic<ureg> sourceIndex;
157         };
158
159         Map& m_map;
160         Hash m_baseHash; // The lowest possible hash value in this subtree; determines index in flattree.
161         // If m_numDestinations == 1, m_shift == 0.
162         // Otherwise, m_shift tells (indirectly) the size of the flattree in which our subtree would exactly fit: 1 << (HASH_BITS
163         // - m_shift).
164         // This ensures that m_shift is always less than sizeof(Hash) * 8, so that shifting by m_shift is not undefined behavior.
165         // To determine the subtree index for a hash during migration, we use: (hash >> m_shift) & (m_numDestinations - 1)
166         // A mask is used since we are only migrating a subtree -- not necessarily the entire map.
167         ureg m_safeShift;
168         turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
169         turf::Atomic<sreg> m_overflowTableIndex;
170         turf::Atomic<sreg> m_unitsRemaining;
171         ureg m_numSources;
172         ureg m_numDestinations; // The size of the subtree being created. Some table pointers may be repeated.
173
174         TableMigration(Map& map) : m_map(map) {
175         }
176
177         static TableMigration* create(Map& map, ureg numSources, ureg numDestinations) {
178             TableMigration* migration = (TableMigration*) TURF_HEAP.alloc(
179                 sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources + sizeof(Table*) * numDestinations);
180             new (migration) TableMigration(map);
181             migration->m_workerStatus.storeNonatomic(0);
182             migration->m_overflowTableIndex.storeNonatomic(-1);
183             migration->m_unitsRemaining.storeNonatomic(0);
184             migration->m_numSources = numSources;
185             migration->m_numDestinations = numDestinations;
186 #if JUNCTION_TRACK_GRAMPA_STATS
187             GrampaStats::Instance.numTableMigrations.increment();
188 #endif
189             // Caller is responsible for filling in source & destination pointers
190             return migration;
191         }
192
193         virtual ~TableMigration() TURF_OVERRIDE {
194         }
195
196         void destroy() {
197 #if JUNCTION_TRACK_GRAMPA_STATS
198             GrampaStats::Instance.numTableMigrations.decrement();
199 #endif
200             // Destroy all source tables.
201             for (ureg i = 0; i < m_numSources; i++)
202                 if (getSources()[i].table)
203                     getSources()[i].table->destroy();
204             // Delete the migration object itself.
205             this->TableMigration::~TableMigration();
206             TURF_HEAP.free(this);
207         }
208
209         ureg getUnsafeShift() const {
210             return m_safeShift ? m_safeShift : (sizeof(Hash) * 8);
211         }
212
213         Source* getSources() const {
214             return (Source*) (this + 1);
215         }
216
217         Table** getDestinations() const {
218             return (Table**) (getSources() + m_numSources);
219         }
220
221         sreg migrateRange(Table* srcTable, ureg startIdx);
222         virtual void run() TURF_OVERRIDE;
223     };
224
225     class FlatTreeMigration;
226
227     struct FlatTree {
228         // The size of the flattree is 1 << 64 - HASH_BITS.
229         // Or, stated another way, (Hash(-1) >> shift) + 1.
230         // To determine the flattree index for a given hash, we simply use: (hash >> shift)
231         // Smaller shift == more significant bits used as an index == bigger flattree.
232         // For example, the simplest flattree has only two entries, and only the most significant
233         // bit of each hash is used as the flattree index. In that case, shift == HASH_BITS - 1.
234         // Each time the flattree doubles in size, shift decreases by 1.
235         const ureg safeShift;
236         junction::striped::Mutex mutex;
237         FlatTreeMigration* migration; // Protected by mutex
238
239         FlatTree(ureg safeShift) : safeShift(safeShift), migration(NULL) {
240             // A FlatTree always has at least two tables, so the shift is always safe.
241             TURF_ASSERT(safeShift < sizeof(Hash) * 8);
242         }
243
244         static FlatTree* create(ureg safeShift) {
245             // A flattree always has at least two tables, so the shift is always safe.
246             TURF_ASSERT(safeShift < sizeof(Hash) * 8);
247             ureg numLeaves = (Hash(-1) >> safeShift) + 1;
248             FlatTree* flatTree = (FlatTree*) TURF_HEAP.alloc(sizeof(FlatTree) + sizeof(turf::Atomic<Table*>) * numLeaves);
249             new (flatTree) FlatTree(safeShift);
250 #if JUNCTION_TRACK_GRAMPA_STATS
251             GrampaStats::Instance.numFlatTrees.increment();
252 #endif
253             // Caller will initialize flatTree->getTables()
254             return flatTree;
255         }
256
257         void destroy() {
258 #if JUNCTION_TRACK_GRAMPA_STATS
259             GrampaStats::Instance.numFlatTrees.decrement();
260 #endif
261             this->FlatTree::~FlatTree();
262             TURF_HEAP.free(this);
263         }
264
265         turf::Atomic<Table*>* getTables() const {
266             return (turf::Atomic<Table*>*) (this + 1);
267         }
268
269         ureg getSize() const {
270             return (Hash(-1) >> safeShift) + 1;
271         }
272
273         ureg getNumMigrationUnits() const {
274             ureg sizeMask = Hash(-1) >> safeShift;
275             return sizeMask / FlatTreeMigrationUnitSize + 1;
276         }
277     };
278
279     class FlatTreeMigration : public SimpleJobCoordinator::Job {
280     public:
281         Map& m_map;
282         FlatTree* m_source;
283         FlatTree* m_destination;
284         turf::Atomic<ureg> m_workerStatus;
285         turf::Atomic<ureg> m_sourceIndex;
286         turf::Atomic<sreg> m_unitsRemaining;
287         junction::striped::ManualResetEvent m_completed;
288
289         FlatTreeMigration(Map& map, FlatTree* flatTree, ureg shift) : m_map(map) {
290             m_source = flatTree;
291             m_destination = FlatTree::create(shift);
292             m_workerStatus.storeNonatomic(0);
293             m_sourceIndex.storeNonatomic(0);
294             m_unitsRemaining.storeNonatomic(flatTree->getNumMigrationUnits());
295 #if JUNCTION_TRACK_GRAMPA_STATS
296             GrampaStats::Instance.numFlatTreeMigrations.increment();
297 #endif
298         }
299
300         virtual ~FlatTreeMigration() TURF_OVERRIDE {
301 #if JUNCTION_TRACK_GRAMPA_STATS
302             GrampaStats::Instance.numFlatTreeMigrations.decrement();
303 #endif
304             // Delete source flattree.
305             m_source->destroy();
306         }
307
308         void destroy() {
309             delete this;
310         }
311
312         virtual void run() TURF_OVERRIDE;
313     };
314
315     static void garbageCollectTable(Table* table) {
316         TURF_ASSERT(table);
317         DefaultQSBR.enqueue(&Table::destroy, table);
318     }
319
320     static void garbageCollectFlatTree(FlatTree* flatTree) {
321         TURF_ASSERT(flatTree);
322         DefaultQSBR.enqueue(&FlatTree::destroy, flatTree);
323     }
324
325     static Cell* find(Hash hash, Table* table, ureg sizeMask) {
326         TURF_TRACE(Grampa, 0, "[find] called", uptr(table), hash);
327         TURF_ASSERT(table);
328         TURF_ASSERT(hash != KeyTraits::NullHash);
329         // Optimistically check hashed cell even though it might belong to another bucket
330         ureg idx = hash & sizeMask;
331         CellGroup* group = table->getCellGroups() + (idx >> 2);
332         Cell* cell = group->cells + (idx & 3);
333         Hash probeHash = cell->hash.load(turf::Relaxed);
334         if (probeHash == hash) {
335             TURF_TRACE(Grampa, 1, "[find] found existing cell optimistically", uptr(table), idx);
336             return cell;
337         } else if (probeHash == KeyTraits::NullHash) {
338             return cell = NULL;
339         }
340         // Follow probe chain for our bucket
341         u8 delta = group->deltas[idx & 3].load(turf::Relaxed);
342         while (delta) {
343             idx = (idx + delta) & sizeMask;
344             group = table->getCellGroups() + (idx >> 2);
345             cell = group->cells + (idx & 3);
346             Hash probeHash = cell->hash.load(turf::Relaxed);
347             // Note: probeHash might actually be NULL due to memory reordering of a concurrent insert,
348             // but we don't check for it. We just follow the probe chain.
349             if (probeHash == hash) {
350                 TURF_TRACE(Grampa, 2, "[find] found existing cell", uptr(table), idx);
351                 return cell;
352             }
353             delta = group->deltas[(idx & 3) + 4].load(turf::Relaxed);
354         }
355         // End of probe chain, not found
356         return NULL;
357     }
358
359     // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
360     enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
361     static InsertResult insert(Hash hash, Table* table, ureg sizeMask, Cell*& cell, ureg& overflowIdx) {
362         TURF_TRACE(Grampa, 3, "[insert] called", uptr(table), hash);
363         TURF_ASSERT(table);
364         TURF_ASSERT(hash != KeyTraits::NullHash);
365         ureg idx = ureg(hash);
366
367         // Check hashed cell first, though it may not even belong to the bucket.
368         CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
369         cell = group->cells + (idx & 3);
370         Hash probeHash = cell->hash.load(turf::Relaxed);
371         if (probeHash == KeyTraits::NullHash) {
372             if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
373                 TURF_TRACE(Grampa, 4, "[insert] reserved first cell", uptr(table), idx);
374                 // There are no links to set. We're done.
375                 return InsertResult_InsertedNew;
376             } else {
377                 TURF_TRACE(Grampa, 5, "[insert] race to reserve first cell", uptr(table), idx);
378                 // Fall through to check if it was the same hash...
379             }
380         }
381         if (probeHash == hash) {
382             TURF_TRACE(Grampa, 6, "[insert] found in first cell", uptr(table), idx);
383             return InsertResult_AlreadyFound;
384         }
385
386         // Follow the link chain for this bucket.
387         ureg maxIdx = idx + sizeMask;
388         ureg linkLevel = 0;
389         turf::Atomic<u8>* prevLink;
390         for (;;) {
391         followLink:
392             prevLink = group->deltas + ((idx & 3) + linkLevel);
393             linkLevel = 4;
394             u8 probeDelta = prevLink->load(turf::Relaxed);
395             if (probeDelta) {
396                 idx += probeDelta;
397                 // Check the hash for this cell.
398                 group = table->getCellGroups() + ((idx & sizeMask) >> 2);
399                 cell = group->cells + (idx & 3);
400                 probeHash = cell->hash.load(turf::Relaxed);
401                 if (probeHash == KeyTraits::NullHash) {
402                     // Cell was linked, but hash is not visible yet.
403                     // We could avoid this case (and guarantee it's visible) using acquire & release, but instead,
404                     // just poll until it becomes visible.
405                     TURF_TRACE(Grampa, 7, "[insert] race to read hash", uptr(table), idx);
406                     do {
407                         probeHash = cell->hash.load(turf::Acquire);
408                     } while (probeHash == KeyTraits::NullHash);
409                 }
410                 TURF_ASSERT(((probeHash ^ hash) & sizeMask) == 0); // Only hashes in same bucket can be linked
411                 if (probeHash == hash) {
412                     TURF_TRACE(Grampa, 8, "[insert] found in probe chain", uptr(table), idx);
413                     return InsertResult_AlreadyFound;
414                 }
415             } else {
416                 // Reached the end of the link chain for this bucket.
417                 // Switch to linear probing until we reserve a new cell or find a late-arriving cell in the same bucket.
418                 ureg prevLinkIdx = idx;
419                 TURF_ASSERT(sreg(maxIdx - idx) >= 0); // Nobody would have linked an idx that's out of range.
420                 ureg linearProbesRemaining = turf::util::min(maxIdx - idx, LinearSearchLimit);
421                 while (linearProbesRemaining-- > 0) {
422                     idx++;
423                     group = table->getCellGroups() + ((idx & sizeMask) >> 2);
424                     cell = group->cells + (idx & 3);
425                     probeHash = cell->hash.load(turf::Relaxed);
426                     if (probeHash == KeyTraits::NullHash) {
427                         // It's an empty cell. Try to reserve it.
428                         if (cell->hash.compareExchangeStrong(probeHash, hash, turf::Relaxed)) {
429                             // Success. We've reserved the cell. Link it to previous cell in same bucket.
430                             TURF_TRACE(Grampa, 9, "[insert] reserved cell", uptr(table), idx);
431                             TURF_ASSERT(probeDelta == 0);
432                             u8 desiredDelta = idx - prevLinkIdx;
433 #if TURF_WITH_ASSERTS
434                             // Note: another thread could actually set the link on our behalf (see below).
435                             probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
436                             TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
437 #else
438                             prevLink->store(desiredDelta, turf::Relaxed);
439 #endif
440                             return InsertResult_InsertedNew;
441                         } else {
442                             TURF_TRACE(Grampa, 10, "[insert] race to reserve cell", uptr(table), idx);
443                             // Fall through to check if it's the same hash...
444                         }
445                     }
446                     Hash x = (probeHash ^ hash);
447                     // Check for same hash.
448                     if (!x) {
449                         TURF_TRACE(Grampa, 11, "[insert] found outside probe chain", uptr(table), idx);
450                         return InsertResult_AlreadyFound;
451                     }
452                     // Check for same bucket.
453                     if ((x & sizeMask) == 0) {
454                         TURF_TRACE(Grampa, 12, "[insert] found late-arriving cell in same bucket", uptr(table), idx);
455                         // Attempt to set the link on behalf of the late-arriving cell.
456                         // This is usually redundant, but if we don't attempt to set the late-arriving cell's link here,
457                         // there's no guarantee that our own link chain will be well-formed by the time this function returns.
458                         // (Indeed, subsequent lookups sometimes failed during testing, for this exact reason.)
459                         u8 desiredDelta = idx - prevLinkIdx;
460 #if TURF_WITH_ASSERTS
461                         probeDelta = prevLink->exchange(desiredDelta, turf::Relaxed);
462                         TURF_ASSERT(probeDelta == 0 || probeDelta == desiredDelta);
463                         if (probeDelta == 0)
464                             TURF_TRACE(Grampa, 13, "[insert] set link on behalf of late-arriving cell", uptr(table), idx);
465 #else
466                         prevLink->store(desiredDelta, turf::Relaxed);
467 #endif
468                         goto followLink; // Try to follow link chain for the bucket again.
469                     }
470                     // Continue linear search...
471                 }
472                 // Table is too full to insert.
473                 overflowIdx = idx + 1;
474                 TURF_TRACE(Grampa, 14, "[insert] overflow", uptr(table), overflowIdx);
475                 return InsertResult_Overflow;
476             }
477         }
478     }
479
480     static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize, ureg splitShift) {
481         // Create new migration by DCLI.
482         TURF_TRACE(Grampa, 15, "[beginTableMigrationToSize] called", 0, 0);
483         SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
484         if (job) {
485             TURF_TRACE(Grampa, 16, "[beginTableMigrationToSize] new migration already exists", 0, 0);
486         } else {
487             turf::LockGuard<junction::striped::Mutex> guard(table->mutex);
488             job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
489             if (job) {
490                 TURF_TRACE(Grampa, 17, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
491             } else {
492                 // Create new migration.
493                 ureg numDestinations = ureg(1) << splitShift;
494                 TableMigration* migration = TableMigration::create(map, 1, numDestinations);
495                 migration->m_baseHash = table->baseHash;
496                 ureg migrationShift = table->unsafeRangeShift - splitShift;
497                 migration->m_safeShift = (migrationShift < sizeof(Hash) * 8) ? migrationShift : 0;
498                 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
499                 migration->getSources()[0].table = table;
500                 migration->getSources()[0].sourceIndex.storeNonatomic(0);
501                 ureg subRangeShift =
502                     table->unsafeRangeShift - splitShift; // subRangeShift is also "unsafe" (possibly represents entire range)
503                 ureg hashOffsetDelta = subRangeShift < (sizeof(Hash) * 8) ? (ureg(1) << subRangeShift) : 0;
504                 for (ureg i = 0; i < numDestinations; i++) {
505                     migration->getDestinations()[i] =
506                         Table::create(nextTableSize, table->baseHash + hashOffsetDelta * i, subRangeShift);
507                 }
508                 // Publish the new migration.
509                 table->jobCoordinator.storeRelease(migration);
510             }
511         }
512     }
513
514     static void beginTableMigration(Map& map, Table* table, ureg overflowIdx, bool mustDouble) {
515         ureg nextTableSize;
516         if (mustDouble) {
517             TURF_TRACE(Grampa, 18, "[beginTableMigration] forced to double", 0, 0);
518             nextTableSize = (table->sizeMask + 1) * 2;
519         } else {
520             // Estimate number of cells in use based on a small sample.
521             ureg sizeMask = table->sizeMask;
522             ureg idx = overflowIdx - CellsInUseSample;
523             ureg inUseCells = 0;
524             for (ureg linearProbesRemaining = CellsInUseSample; linearProbesRemaining > 0; linearProbesRemaining--) {
525                 CellGroup* group = table->getCellGroups() + ((idx & sizeMask) >> 2);
526                 Cell* cell = group->cells + (idx & 3);
527                 Value value = cell->value.load(turf::Relaxed);
528                 if (value == Value(ValueTraits::Redirect)) {
529                     // Another thread kicked off the jobCoordinator. The caller will participate upon return.
530                     TURF_TRACE(Grampa, 19, "[beginTableMigration] redirected while determining table size", 0, 0);
531                     return;
532                 }
533                 if (value != Value(ValueTraits::NullValue))
534                     inUseCells++;
535                 idx++;
536             }
537             float inUseRatio = float(inUseCells) / CellsInUseSample;
538             float estimatedInUse = (sizeMask + 1) * inUseRatio;
539             nextTableSize = turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2));
540             // FIXME: Support migrating to smaller tables.
541             nextTableSize = turf::util::max(nextTableSize, sizeMask + 1);
542         }
543         // Split into multiple tables if necessary.
544         ureg splitShift = 0;
545         while (nextTableSize > LeafSize) {
546             splitShift++;
547             nextTableSize >>= 1;
548         }
549         beginTableMigrationToSize(map, table, nextTableSize, splitShift);
550     }
551
552     static FlatTreeMigration* createFlatTreeMigration(Map& map, FlatTree* flatTree, ureg shift) {
553         turf::LockGuard<junction::striped::Mutex> guard(flatTree->mutex);
554         if (!flatTree->migration) {
555             flatTree->migration = new FlatTreeMigration(map, flatTree, shift);
556         }
557         return flatTree->migration;
558     }
559
560     static FlatTreeMigration* getExistingFlatTreeMigration(FlatTree* flatTree) {
561         turf::LockGuard<junction::striped::Mutex> guard(flatTree->mutex);
562         TURF_ASSERT(flatTree->migration); // Must already exist!
563         return flatTree->migration;
564     }
565 }; // Grampa
566
567 // Return index of the destination table that overflowed, or -1 if none
568 template <class Map>
569 sreg Grampa<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
570     ureg srcSizeMask = srcTable->sizeMask;
571     ureg safeShift = m_safeShift;
572     Table** dstLeafs = getDestinations();
573     ureg dstLeafMask = m_numDestinations - 1;
574     ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
575     // Iterate over source range.
576     for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
577         CellGroup* srcGroup = srcTable->getCellGroups() + ((srcIdx & srcSizeMask) >> 2);
578         Cell* srcCell = srcGroup->cells + (srcIdx & 3);
579         Hash srcHash;
580         Value srcValue;
581         // Fetch the srcHash and srcValue.
582         for (;;) {
583             srcHash = srcCell->hash.load(turf::Relaxed);
584             if (srcHash == KeyTraits::NullHash) {
585                 // An unused cell. Try to put a Redirect marker in its value.
586                 srcValue =
587                     srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
588                 if (srcValue == Value(ValueTraits::Redirect)) {
589                     // srcValue is already marked Redirect due to previous incomplete migration.
590                     TURF_TRACE(Grampa, 20, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
591                     break;
592                 }
593                 if (srcValue == Value(ValueTraits::NullValue))
594                     break; // Redirect has been placed. Break inner loop, continue outer loop.
595                 TURF_TRACE(Grampa, 21, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
596                 // Otherwise, somebody just claimed the cell. Read srcHash again...
597             } else {
598                 // Check for deleted/uninitialized value.
599                 srcValue = srcCell->value.load(turf::Relaxed);
600                 if (srcValue == Value(ValueTraits::NullValue)) {
601                     // Try to put a Redirect marker.
602                     if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
603                         break; // Redirect has been placed. Break inner loop, continue outer loop.
604                     TURF_TRACE(Grampa, 22, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
605                     if (srcValue == Value(ValueTraits::Redirect)) {
606                         // FIXME: I don't think this will happen. Investigate & change to assert
607                         TURF_TRACE(Grampa, 23, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
608                         break;
609                     }
610                 } else if (srcValue == Value(ValueTraits::Redirect)) {
611                     // srcValue is already marked Redirect due to previous incomplete migration.
612                     TURF_TRACE(Grampa, 24, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
613                     break;
614                 }
615
616                 // We've got a key/value pair to migrate.
617                 // Reserve a destination cell in dstTable.
618                 TURF_ASSERT(srcHash != KeyTraits::NullHash);
619                 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
620                 TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
621                 ureg destLeafIndex = (srcHash >> safeShift) & dstLeafMask;
622                 Table* dstLeaf = dstLeafs[destLeafIndex];
623                 Cell* dstCell;
624                 ureg overflowIdx;
625                 InsertResult result = insert(srcHash, dstLeaf, dstLeaf->sizeMask, dstCell, overflowIdx);
626                 // During migration, a hash can only exist in one place among all the source tables,
627                 // and it is only migrated by one thread. Therefore, the hash will never already exist
628                 // in the destination table:
629                 TURF_ASSERT(result != InsertResult_AlreadyFound);
630                 if (result == InsertResult_Overflow) {
631                     // Destination overflow.
632                     // This can happen for several reasons. For example, the source table could have
633                     // existed of all deleted cells when it overflowed, resulting in a small destination
634                     // table size, but then another thread could re-insert all the same hashes
635                     // before the migration completed.
636                     // Caller will cancel the current migration and begin a new one.
637                     return destLeafIndex;
638                 }
639                 // Migrate the old value to the new cell.
640                 for (;;) {
641                     // Copy srcValue to the destination.
642                     dstCell->value.store(srcValue, turf::Relaxed);
643                     // Try to place a Redirect marker in srcValue.
644                     Value doubleCheckedSrcValue =
645                         srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
646                     TURF_ASSERT(doubleCheckedSrcValue !=
647                                 Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
648                     if (doubleCheckedSrcValue == srcValue) {
649                         // No racing writes to the src. We've successfully placed the Redirect marker.
650                         // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
651                         // by a late-arriving erase.
652                         if (srcValue == Value(ValueTraits::NullValue))
653                             TURF_TRACE(Grampa, 25, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
654                         break;
655                     }
656                     // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
657                     TURF_TRACE(Grampa, 26, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
658                     srcValue = doubleCheckedSrcValue;
659                 }
660                 // Cell successfully migrated. Proceed to next source cell.
661                 break;
662             }
663         }
664     }
665     // Range has been migrated successfully.
666     return -1;
667 }
668
669 template <class Map>
670 void Grampa<Map>::TableMigration::run() {
671     // Conditionally increment the shared # of workers.
672     ureg probeStatus = m_workerStatus.load(turf::Relaxed);
673     do {
674         if (probeStatus & 1) {
675             // End flag is already set, so do nothing.
676             TURF_TRACE(Grampa, 27, "[TableMigration::run] already ended", uptr(this), 0);
677             return;
678         }
679     } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
680     // # of workers has been incremented, and the end flag is clear.
681     TURF_ASSERT((probeStatus & 1) == 0);
682
683     // Iterate over all source tables.
684     Source* sources = getSources();
685     for (ureg s = 0; s < m_numSources; s++) {
686         Source& source = sources[s];
687         // Loop over all migration units in this source table.
688         for (;;) {
689             if (m_workerStatus.load(turf::Relaxed) & 1) {
690                 TURF_TRACE(Grampa, 28, "[TableMigration::run] detected end flag set", uptr(this), 0);
691                 goto endMigration;
692             }
693             ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
694             if (startIdx >= source.table->sizeMask + 1)
695                 break; // No more migration units in this table. Try next source table.
696             sreg overflowTableIndex = migrateRange(source.table, startIdx);
697             if (overflowTableIndex >= 0) {
698                 // *** FAILED MIGRATION ***
699                 // TableMigration failed due to destination table overflow.
700                 // No other thread can declare the migration successful at this point, because *this* unit will never complete,
701                 // hence m_unitsRemaining won't reach zero.
702                 // However, multiple threads can independently detect a failed migration at the same time.
703                 TURF_TRACE(Grampa, 29, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
704                 // The reason we store overflowTableIndex in a shared variable is because we must flush all the worker threads
705                 // before we can safely deal with the overflow. Therefore, the thread that detects the failure is often
706                 // different from the thread that deals with it.
707                 // Store overflowTableIndex unconditionally; racing writes should be rare, and it doesn't matter which one wins.
708                 sreg oldIndex = m_overflowTableIndex.exchange(overflowTableIndex, turf::Relaxed);
709                 if (oldIndex >= 0)
710                     TURF_TRACE(Grampa, 30, "[TableMigration::run] race to set m_overflowTableIndex", uptr(overflowTableIndex),
711                                uptr(oldIndex));
712                 m_workerStatus.fetchOr(1, turf::Relaxed);
713                 goto endMigration;
714             }
715             sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
716             TURF_ASSERT(prevRemaining > 0);
717             if (prevRemaining == 1) {
718                 // *** SUCCESSFUL MIGRATION ***
719                 // That was the last chunk to migrate.
720                 m_workerStatus.fetchOr(1, turf::Relaxed);
721                 goto endMigration;
722             }
723         }
724     }
725     TURF_TRACE(Grampa, 31, "[TableMigration::run] out of migration units", uptr(this), 0);
726
727 endMigration:
728     // Decrement the shared # of workers.
729     probeStatus =
730         m_workerStatus.fetchSub(2, turf::AcquireRelease); // Ensure all modifications are visible to the thread that will publish
731     if (probeStatus >= 4) {
732         // There are other workers remaining. Return here so that only the very last worker will proceed.
733         TURF_TRACE(Grampa, 32, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
734         return;
735     }
736
737     // We're the very last worker thread.
738     // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
739     TURF_ASSERT(probeStatus == 3);
740     sreg overflowTableIndex = m_overflowTableIndex.loadNonatomic(); // No racing writes at this point
741     if (overflowTableIndex < 0) {
742         // The migration succeeded. This is the most likely outcome. Publish the new subtree.
743         m_map.publishTableMigration(this);
744         // End the jobCoodinator.
745         sources[0].table->jobCoordinator.end();
746     } else {
747         // The migration failed due to the overflow of a destination table.
748         Table* origTable = sources[0].table;
749         ureg count = ureg(1) << (origTable->unsafeRangeShift - getUnsafeShift());
750         ureg lo = overflowTableIndex & ~(count - 1);
751         TURF_ASSERT(lo + count <= m_numDestinations);
752         turf::LockGuard<junction::striped::Mutex> guard(origTable->mutex);
753         SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
754         if (checkedJob != this) {
755             TURF_TRACE(Grampa, 33, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
756                        uptr(checkedJob));
757         } else {
758             TableMigration* migration;
759             Table* overflowedTable = getDestinations()[overflowTableIndex];
760             if (overflowedTable->sizeMask + 1 < LeafSize) {
761                 // The entire map is contained in a small table.
762                 TURF_TRACE(Grampa, 34, "[TableMigration::run] overflow occured in a small map", uptr(origTable),
763                            uptr(checkedJob));
764                 TURF_ASSERT(overflowedTable->unsafeRangeShift == sizeof(Hash) * 8);
765                 TURF_ASSERT(overflowedTable->baseHash == 0);
766                 TURF_ASSERT(m_numDestinations == 1);
767                 TURF_ASSERT(m_baseHash == 0);
768                 migration = TableMigration::create(m_map, m_numSources + 1, 1);
769                 migration->m_baseHash = 0;
770                 migration->m_safeShift = 0;
771                 // Double the destination table size.
772                 migration->getDestinations()[0] = Table::create((overflowedTable->sizeMask + 1) * 2, overflowedTable->baseHash,
773                                                                 overflowedTable->unsafeRangeShift);
774             } else {
775                 // The overflowed table is already the size of a leaf. Split it into two ranges.
776                 if (count == 1) {
777                     TURF_TRACE(Grampa, 35, "[TableMigration::run] doubling subtree size after failure", uptr(origTable),
778                                uptr(checkedJob));
779                     migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations * 2);
780                     migration->m_baseHash = m_baseHash;
781                     migration->m_safeShift = getUnsafeShift() - 1;
782                     for (ureg i = 0; i < m_numDestinations; i++) {
783                         migration->getDestinations()[i * 2] = getDestinations()[i];
784                         migration->getDestinations()[i * 2 + 1] = getDestinations()[i];
785                     }
786                     count = 2;
787                 } else {
788                     TURF_TRACE(Grampa, 36, "[TableMigration::run] keeping same subtree size after failure", uptr(origTable),
789                                uptr(checkedJob));
790                     migration = TableMigration::create(m_map, m_numSources + 1, m_numDestinations);
791                     migration->m_baseHash = m_baseHash;
792                     migration->m_safeShift = m_safeShift;
793                     memcpy(migration->getDestinations(), getDestinations(), m_numDestinations * sizeof(Table*));
794                 }
795                 Table* splitTable1 = Table::create(LeafSize, origTable->baseHash, origTable->unsafeRangeShift - 1);
796                 ureg i = 0;
797                 for (; i < count / 2; i++) {
798                     migration->getDestinations()[lo + i] = splitTable1;
799                 }
800                 ureg halfNumHashes = ureg(1) << (origTable->unsafeRangeShift - 1);
801                 Table* splitTable2 =
802                     Table::create(LeafSize, origTable->baseHash + halfNumHashes, origTable->unsafeRangeShift - 1);
803                 for (; i < count; i++) {
804                     migration->getDestinations()[lo + i] = splitTable2;
805                 }
806             }
807             // Transfer source tables to the new migration.
808             for (ureg i = 0; i < m_numSources; i++) {
809                 migration->getSources()[i].table = getSources()[i].table;
810                 migration->getSources()[i].sourceIndex.storeNonatomic(0);
811                 getSources()[i].table = NULL;
812             }
813             migration->getSources()[m_numSources].table = overflowedTable;
814             migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
815             // Calculate total number of migration units to move.
816             ureg unitsRemaining = 0;
817             for (ureg s = 0; s < migration->m_numSources; s++)
818                 unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
819             migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
820             // Publish the new migration.
821             origTable->jobCoordinator.storeRelease(migration);
822         }
823     }
824
825     // We're done with this TableMigration. Queue it for GC.
826     DefaultQSBR.enqueue(&TableMigration::destroy, this);
827 }
828
829 template <class Map>
830 void Grampa<Map>::FlatTreeMigration::run() {
831     // Conditionally increment the shared # of workers.
832     ureg probeStatus = m_workerStatus.load(turf::Relaxed);
833     do {
834         if (probeStatus & 1) {
835             // End flag is already set, so do nothing.
836             TURF_TRACE(Grampa, 37, "[FlatTreeMigration::run] already ended", uptr(this), 0);
837             return;
838         }
839     } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
840     // # of workers has been incremented, and the end flag is clear.
841     TURF_ASSERT((probeStatus & 1) == 0);
842
843     // Loop over all migration units
844     ureg srcSize = (Hash(-1) >> m_source->safeShift) + 1;
845     // FIXME: Support migration to smaller flattrees
846     TURF_ASSERT(m_destination->safeShift < m_source->safeShift);
847     ureg repeat = ureg(1) << (m_source->safeShift - m_destination->safeShift);
848     for (;;) {
849         ureg srcStart = m_sourceIndex.fetchAdd(FlatTreeMigrationUnitSize, turf::Relaxed);
850         if (srcStart >= srcSize)
851             break; // No more migration units in this flattree.
852         // Migrate this range
853         ureg srcEnd = turf::util::min(srcSize, srcStart + FlatTreeMigrationUnitSize);
854         ureg dst = srcStart * repeat;
855         for (ureg src = srcStart; src < srcEnd; src++) {
856             // Pointers in the source table can be changed at any time due to concurrent subtree publishing,
857             // so we need to exchange them with Redirect markers.
858             Table* t = m_source->getTables()[src].exchange((Table*) RedirectFlatTree, turf::Relaxed);
859             TURF_ASSERT(uptr(t) != RedirectFlatTree);
860             for (ureg r = repeat; r > 0; r--) {
861                 m_destination->getTables()[dst].storeNonatomic(t);
862                 dst++;
863             }
864         }
865         // Decrement m_unitsRemaining
866         sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
867         if (prevRemaining == 1) {
868             // *** SUCCESSFUL MIGRATION ***
869             // That was the last chunk to migrate.
870             m_workerStatus.fetchOr(1, turf::Relaxed);
871             break;
872         }
873     }
874
875     // Decrement the shared # of workers.
876     probeStatus = m_workerStatus.fetchSub(
877         2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
878     if (probeStatus >= 4) {
879         // There are other workers remaining. Return here so that only the very last worker will proceed.
880         return;
881     }
882
883     // We're the very last worker thread.
884     // Publish the new flattree.
885     TURF_ASSERT(probeStatus == 3); // End flag must be set
886     m_map.publishFlatTreeMigration(this);
887     m_completed.signal();
888
889     // We're done with this FlatTreeMigration. Queue it for GC.
890     DefaultQSBR.enqueue(&FlatTreeMigration::destroy, this);
891 }
892
893 } // namespace details
894 } // namespace junction
895
896 #endif // JUNCTION_DETAILS_GRAMPA_H