Add SingleMap_Leapfrog
[junction.git] / junction / details / Linear.h
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #ifndef JUNCTION_DETAILS_LINEAR_H
14 #define JUNCTION_DETAILS_LINEAR_H
15
16 #include <junction/Core.h>
17 #include <turf/Atomic.h>
18 #include <turf/Mutex.h>
19 #include <turf/ManualResetEvent.h>
20 #include <turf/Util.h>
21 #include <junction/MapTraits.h>
22 #include <turf/Trace.h>
23 #include <turf/Heap.h>
24 #include <junction/SimpleJobCoordinator.h>
25 #include <junction/QSBR.h>
26
27 // Enable this to force migration overflows (for test purposes):
28 #define JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS 0
29
30 namespace junction {
31 namespace details {
32
33 TURF_TRACE_DECLARE(Linear, 27)
34
35 template <class Map>
36 struct Linear {
37     typedef typename Map::Hash Hash;
38     typedef typename Map::Value Value;
39     typedef typename Map::KeyTraits KeyTraits;
40     typedef typename Map::ValueTraits ValueTraits;
41
42     static const ureg InitialSize = 8;
43     static const ureg TableMigrationUnitSize = 32;
44     static const ureg CellsInUseSample = 256;
45
46     struct Cell {
47         turf::Atomic<Hash> hash;
48         turf::Atomic<Value> value;
49     };
50
51     struct Table {
52         const ureg sizeMask; // a power of two minus one
53         turf::Atomic<sreg> cellsRemaining;
54         turf::Mutex mutex;                   // to DCLI the TableMigration (stored in the jobCoordinator)
55         SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
56
57         Table(ureg sizeMask) : sizeMask(sizeMask), cellsRemaining(sreg(sizeMask * 0.75f)) {
58         }
59
60         static Table* create(ureg tableSize) {
61             TURF_ASSERT(turf::util::isPowerOf2(tableSize));
62             Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
63             new (table) Table(tableSize - 1);
64             for (ureg j = 0; j < tableSize; j++) {
65                 table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
66                 table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
67             }
68             return table;
69         }
70
71         void destroy() {
72             this->Table::~Table();
73             TURF_HEAP.free(this);
74         }
75
76         Cell* getCells() const {
77             return (Cell*) (this + 1);
78         }
79
80         ureg getNumMigrationUnits() const {
81             return sizeMask / TableMigrationUnitSize + 1;
82         }
83     };
84
85     class TableMigration : public SimpleJobCoordinator::Job {
86     public:
87         struct Source {
88             Table* table;
89             turf::Atomic<ureg> sourceIndex;
90         };
91
92         Map& m_map;
93         Table* m_destination;
94         turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
95         turf::Atomic<bool> m_overflowed;
96         turf::Atomic<sreg> m_unitsRemaining;
97         ureg m_numSources;
98
99         TableMigration(Map& map) : m_map(map) {
100         }
101
102         static TableMigration* create(Map& map, ureg numSources) {
103             TableMigration* migration =
104                 (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
105             new (migration) TableMigration(map);
106             migration->m_workerStatus.storeNonatomic(0);
107             migration->m_overflowed.storeNonatomic(false);
108             migration->m_unitsRemaining.storeNonatomic(0);
109             migration->m_numSources = numSources;
110             // Caller is responsible for filling in sources & destination
111             return migration;
112         }
113
114         virtual ~TableMigration() TURF_OVERRIDE {
115         }
116
117         void destroy() {
118             // Destroy all source tables.
119             for (ureg i = 0; i < m_numSources; i++)
120                 if (getSources()[i].table)
121                     getSources()[i].table->destroy();
122             // Delete the migration object itself.
123             this->TableMigration::~TableMigration();
124             TURF_HEAP.free(this);
125         }
126
127         Source* getSources() const {
128             return (Source*) (this + 1);
129         }
130
131         bool migrateRange(Table* srcTable, ureg startIdx);
132         virtual void run() TURF_OVERRIDE;
133     };
134
135     static Cell* find(Hash hash, Table* table) {
136         TURF_TRACE(Linear, 0, "[find] called", uptr(table), hash);
137         TURF_ASSERT(table);
138         TURF_ASSERT(hash != KeyTraits::NullHash);
139         ureg sizeMask = table->sizeMask;
140         for (ureg idx = ureg(hash);; idx++) {
141             idx &= sizeMask;
142             Cell* cell = table->getCells() + idx;
143             // Load the hash that was there.
144             Hash probeHash = cell->hash.load(turf::Relaxed);
145             if (probeHash == hash) {
146                 TURF_TRACE(Linear, 1, "[find] found existing cell", uptr(table), idx);
147                 return cell;
148             } else if (probeHash == KeyTraits::NullHash) {
149                 return NULL;
150             }
151         }
152     }
153
154     // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
155     enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
156     static InsertResult insertOrFind(Hash hash, Table* table, Cell*& cell) {
157         TURF_TRACE(Linear, 2, "[insertOrFind] called", uptr(table), hash);
158         TURF_ASSERT(table);
159         TURF_ASSERT(hash != KeyTraits::NullHash);
160         ureg sizeMask = table->sizeMask;
161
162         for (ureg idx = ureg(hash);; idx++) {
163             idx &= sizeMask;
164             cell = table->getCells() + idx;
165             // Load the existing hash.
166             Hash probeHash = cell->hash.load(turf::Relaxed);
167             if (probeHash == hash) {
168                 TURF_TRACE(Linear, 3, "[insertOrFind] found existing cell", uptr(table), idx);
169                 return InsertResult_AlreadyFound; // Key found in table. Return the existing cell.
170             }
171             if (probeHash == KeyTraits::NullHash) {
172                 // It's an empty cell. Try to reserve it.
173                 // But first, decrement cellsRemaining to ensure we have permission to create new cells.
174                 s32 prevCellsRemaining = table->cellsRemaining.fetchSub(1, turf::Relaxed);
175                 if (prevCellsRemaining <= 0) {
176                     // Table is overpopulated.
177                     TURF_TRACE(Linear, 4, "[insertOrFind] ran out of cellsRemaining", prevCellsRemaining, 0);
178                     table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
179                     return InsertResult_Overflow;
180                 }
181                 // Try to reserve this cell.
182                 Hash prevHash = cell->hash.compareExchange(KeyTraits::NullHash, hash, turf::Relaxed);
183                 if (prevHash == KeyTraits::NullHash) {
184                     // Success. We reserved a new cell.
185                     TURF_TRACE(Linear, 5, "[insertOrFind] reserved cell", prevCellsRemaining, idx);
186                     return InsertResult_InsertedNew;
187                 }
188                 // There was a race and another thread reserved that cell from under us.
189                 TURF_TRACE(Linear, 6, "[insertOrFind] detected race to reserve cell", ureg(hash), idx);
190                 table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
191                 if (prevHash == hash) {
192                     TURF_TRACE(Linear, 7, "[insertOrFind] race reserved same hash", ureg(hash), idx);
193                     return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell.
194                 }
195             }
196             // Try again in the next cell.
197         }
198     }
199
200     static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
201         // Create new migration by DCLI.
202         TURF_TRACE(Linear, 8, "[beginTableMigrationToSize] called", 0, 0);
203         SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
204         if (job) {
205             TURF_TRACE(Linear, 9, "[beginTableMigrationToSize] new migration already exists", 0, 0);
206         } else {
207             turf::LockGuard<turf::Mutex> guard(table->mutex);
208             job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
209             if (job) {
210                 TURF_TRACE(Linear, 10, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
211             } else {
212                 // Create new migration.
213                 TableMigration* migration = TableMigration::create(map, 1);
214                 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
215                 migration->getSources()[0].table = table;
216                 migration->getSources()[0].sourceIndex.storeNonatomic(0);
217                 migration->m_destination = Table::create(nextTableSize);
218                 // Publish the new migration.
219                 table->jobCoordinator.storeRelease(migration);
220             }
221         }
222     }
223
224     static void beginTableMigration(Map& map, Table* table, bool mustDouble) {
225         ureg nextTableSize;
226         if (mustDouble) {
227             TURF_TRACE(Linear, 11, "[beginTableMigration] forced to double", 0, 0);
228             nextTableSize = (table->sizeMask + 1) * 2;
229         } else {
230             // Estimate number of cells in use based on a small sample.
231             ureg idx = 0;
232             ureg sampleSize = turf::util::min<ureg>(table->sizeMask + 1, CellsInUseSample);
233             ureg inUseCells = 0;
234             for (; idx < sampleSize; idx++) {
235                 Cell* cell = table->getCells() + idx;
236                 Value value = cell->value.load(turf::Relaxed);
237                 if (value == Value(ValueTraits::Redirect)) {
238                     // Another thread kicked off the jobCoordinator. The caller will participate upon return.
239                     TURF_TRACE(Linear, 12, "[beginTableMigration] redirected while determining table size", 0, 0);
240                     return;
241                 }
242                 if (value != Value(ValueTraits::NullValue))
243                     inUseCells++;
244             }
245             float inUseRatio = float(inUseCells) / sampleSize;
246             float estimatedInUse = (table->sizeMask + 1) * inUseRatio;
247 #if JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS
248             // Periodically underestimate the number of cells in use.
249             // This exercises the code that handles overflow during migration.
250             static ureg counter = 1;
251             if ((++counter & 3) == 0) {
252                 estimatedInUse /= 4;
253             }
254 #endif
255             nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
256         }
257         beginTableMigrationToSize(map, table, nextTableSize);
258     }
259 }; // Linear
260
261 template <class Map>
262 bool Linear<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
263     ureg srcSizeMask = srcTable->sizeMask;
264     ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
265     // Iterate over source range.
266     for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
267         Cell* srcCell = srcTable->getCells() + (srcIdx & srcSizeMask);
268         Hash srcHash;
269         Value srcValue;
270         // Fetch the srcHash and srcValue.
271         for (;;) {
272             srcHash = srcCell->hash.load(turf::Relaxed);
273             if (srcHash == KeyTraits::NullHash) {
274                 // An unused cell. Try to put a Redirect marker in its value.
275                 srcValue =
276                     srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
277                 if (srcValue == Value(ValueTraits::Redirect)) {
278                     // srcValue is already marked Redirect due to previous incomplete migration.
279                     TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
280                     break;
281                 }
282                 if (srcValue == Value(ValueTraits::NullValue))
283                     break; // Redirect has been placed. Break inner loop, continue outer loop.
284                 TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
285                 // Otherwise, somebody just claimed the cell. Read srcHash again...
286             } else {
287                 // Check for deleted/uninitialized value.
288                 srcValue = srcCell->value.load(turf::Relaxed);
289                 if (srcValue == Value(ValueTraits::NullValue)) {
290                     // Try to put a Redirect marker.
291                     if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
292                         break; // Redirect has been placed. Break inner loop, continue outer loop.
293                     TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
294                     if (srcValue == Value(ValueTraits::Redirect)) {
295                         // FIXME: I don't think this will happen. Investigate & change to assert
296                         TURF_TRACE(Linear, 16, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
297                         break;
298                     }
299                 } else if (srcValue == Value(ValueTraits::Redirect)) {
300                     // srcValue is already marked Redirect due to previous incomplete migration.
301                     TURF_TRACE(Linear, 17, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
302                     break;
303                 }
304
305                 // We've got a key/value pair to migrate.
306                 // Reserve a destination cell in the destination.
307                 TURF_ASSERT(srcHash != KeyTraits::NullHash);
308                 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
309                 TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
310                 Cell* dstCell;
311                 InsertResult result = insertOrFind(srcHash, m_destination, dstCell);
312                 // During migration, a hash can only exist in one place among all the source tables,
313                 // and it is only migrated by one thread. Therefore, the hash will never already exist
314                 // in the destination table:
315                 TURF_ASSERT(result != InsertResult_AlreadyFound);
316                 if (result == InsertResult_Overflow) {
317                     // Destination overflow.
318                     // This can happen for several reasons. For example, the source table could have
319                     // existed of all deleted cells when it overflowed, resulting in a small destination
320                     // table size, but then another thread could re-insert all the same hashes
321                     // before the migration completed.
322                     // Caller will cancel the current migration and begin a new one.
323                     return false;
324                 }
325                 // Migrate the old value to the new cell.
326                 for (;;) {
327                     // Copy srcValue to the destination.
328                     dstCell->value.store(srcValue, turf::Relaxed);
329                     // Try to place a Redirect marker in srcValue.
330                     Value doubleCheckedSrcValue =
331                         srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
332                     TURF_ASSERT(doubleCheckedSrcValue !=
333                                 Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
334                     if (doubleCheckedSrcValue == srcValue) {
335                         // No racing writes to the src. We've successfully placed the Redirect marker.
336                         // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
337                         // by a late-arriving erase.
338                         if (srcValue == Value(ValueTraits::NullValue))
339                             TURF_TRACE(Linear, 18, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
340                         break;
341                     }
342                     // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
343                     TURF_TRACE(Linear, 19, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
344                     srcValue = doubleCheckedSrcValue;
345                 }
346                 // Cell successfully migrated. Proceed to next source cell.
347                 break;
348             }
349         }
350     }
351     // Range has been migrated successfully.
352     return true;
353 }
354
355 template <class Map>
356 void Linear<Map>::TableMigration::run() {
357     // Conditionally increment the shared # of workers.
358     ureg probeStatus = m_workerStatus.load(turf::Relaxed);
359     do {
360         if (probeStatus & 1) {
361             // End flag is already set, so do nothing.
362             TURF_TRACE(Linear, 20, "[TableMigration::run] already ended", uptr(this), 0);
363             return;
364         }
365     } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
366     // # of workers has been incremented, and the end flag is clear.
367     TURF_ASSERT((probeStatus & 1) == 0);
368
369     // Iterate over all source tables.
370     for (ureg s = 0; s < m_numSources; s++) {
371         Source& source = getSources()[s];
372         // Loop over all migration units in this source table.
373         for (;;) {
374             if (m_workerStatus.load(turf::Relaxed) & 1) {
375                 TURF_TRACE(Linear, 21, "[TableMigration::run] detected end flag set", uptr(this), 0);
376                 goto endMigration;
377             }
378             ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
379             if (startIdx >= source.table->sizeMask + 1)
380                 break; // No more migration units in this table. Try next source table.
381             bool overflowed = !migrateRange(source.table, startIdx);
382             if (overflowed) {
383                 // *** FAILED MIGRATION ***
384                 // TableMigration failed due to destination table overflow.
385                 // No other thread can declare the migration successful at this point, because *this* unit will never complete,
386                 // hence m_unitsRemaining won't reach zero.
387                 // However, multiple threads can independently detect a failed migration at the same time.
388                 TURF_TRACE(Linear, 22, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
389                 // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
390                 // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
391                 // the thread
392                 // that deals with it.
393                 bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
394                 if (oldOverflowed)
395                     TURF_TRACE(Linear, 23, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
396                                uptr(oldOverflowed));
397                 m_workerStatus.fetchOr(1, turf::Relaxed);
398                 goto endMigration;
399             }
400             sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
401             TURF_ASSERT(prevRemaining > 0);
402             if (prevRemaining == 1) {
403                 // *** SUCCESSFUL MIGRATION ***
404                 // That was the last chunk to migrate.
405                 m_workerStatus.fetchOr(1, turf::Relaxed);
406                 goto endMigration;
407             }
408         }
409     }
410     TURF_TRACE(Linear, 24, "[TableMigration::run] out of migration units", uptr(this), 0);
411
412 endMigration:
413     // Decrement the shared # of workers.
414     probeStatus = m_workerStatus.fetchSub(
415         2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
416     if (probeStatus >= 4) {
417         // There are other workers remaining. Return here so that only the very last worker will proceed.
418         TURF_TRACE(Linear, 25, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
419         return;
420     }
421
422     // We're the very last worker thread.
423     // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
424     TURF_ASSERT(probeStatus == 3);
425     bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
426     if (!overflowed) {
427         // The migration succeeded. This is the most likely outcome. Publish the new subtree.
428         m_map.publishTableMigration(this);
429         // End the jobCoodinator.
430         getSources()[0].table->jobCoordinator.end();
431     } else {
432         // The migration failed due to the overflow of the destination table.
433         Table* origTable = getSources()[0].table;
434         turf::LockGuard<turf::Mutex> guard(origTable->mutex);
435         SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
436         if (checkedJob != this) {
437             TURF_TRACE(Linear, 26, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
438                        uptr(checkedJob));
439         } else {
440             TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
441             // Double the destination table size.
442             migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
443             // Transfer source tables to the new migration.
444             for (ureg i = 0; i < m_numSources; i++) {
445                 migration->getSources()[i].table = getSources()[i].table;
446                 getSources()[i].table = NULL;
447                 migration->getSources()[i].sourceIndex.storeNonatomic(0);
448             }
449             migration->getSources()[m_numSources].table = m_destination;
450             migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
451             // Calculate total number of migration units to move.
452             ureg unitsRemaining = 0;
453             for (ureg s = 0; s < migration->m_numSources; s++)
454                 unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
455             migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
456             // Publish the new migration.
457             origTable->jobCoordinator.storeRelease(migration);
458         }
459     }
460
461     // We're done with this TableMigration. Queue it for GC.
462     DefaultQSBR.enqueue(&TableMigration::destroy, this);
463 }
464
465 } // namespace details
466 } // namespace junction
467
468 #endif // JUNCTION_DETAILS_LINEAR_H