40fc450980b62cdc8212768934a9f0b725e76abd
[junction.git] / junction / details / Linear.h
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #ifndef JUNCTION_DETAILS_LINEAR_H
14 #define JUNCTION_DETAILS_LINEAR_H
15
16 #include <junction/Core.h>
17 #include <turf/Atomic.h>
18 #include <turf/Mutex.h>
19 #include <turf/ManualResetEvent.h>
20 #include <turf/Util.h>
21 #include <junction/MapTraits.h>
22 #include <turf/Trace.h>
23 #include <turf/Heap.h>
24 #include <junction/SimpleJobCoordinator.h>
25 #include <junction/QSBR.h>
26
27 // Enable this to force migration overflows (for test purposes):
28 #define JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS 0
29
30 namespace junction {
31 namespace details {
32
33 TURF_TRACE_DECLARE(Linear, 26)
34
35 template <class Map>
36 struct Linear {
37     typedef typename Map::Hash Hash;
38     typedef typename Map::Value Value;
39     typedef typename Map::KeyTraits KeyTraits;
40     typedef typename Map::ValueTraits ValueTraits;
41
42     static const ureg InitialSize = 8;
43     static const ureg TableMigrationUnitSize = 32;
44     static const ureg CellsInUseSample = 256;
45
46     struct Cell {
47         turf::Atomic<Hash> hash;
48         turf::Atomic<Value> value;
49     };
50
51     struct Table {
52         const ureg sizeMask; // a power of two minus one
53         turf::Atomic<sreg> cellsRemaining;
54         turf::Mutex mutex;                   // to DCLI the TableMigration (stored in the jobCoordinator)
55         SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
56
57         Table(ureg sizeMask) : sizeMask(sizeMask), cellsRemaining(sreg(sizeMask * 0.75f)) {
58         }
59
60         static Table* create(ureg tableSize) {
61             TURF_ASSERT(turf::util::isPowerOf2(tableSize));
62             Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
63             new (table) Table(tableSize - 1);
64             for (ureg j = 0; j < tableSize; j++) {
65                 table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
66                 table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
67             }
68             return table;
69         }
70
71         void destroy() {
72             this->Table::~Table();
73             TURF_HEAP.free(this);
74         }
75
76         Cell* getCells() const {
77             return (Cell*) (this + 1);
78         }
79
80         ureg getNumMigrationUnits() const {
81             return sizeMask / TableMigrationUnitSize + 1;
82         }
83     };
84
85     class TableMigration : public SimpleJobCoordinator::Job {
86     public:
87         struct Source {
88             Table* table;
89             turf::Atomic<ureg> sourceIndex;
90         };
91
92         Map& m_map;
93         Table* m_destination;
94         turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
95         turf::Atomic<bool> m_overflowed;
96         turf::Atomic<sreg> m_unitsRemaining;
97         ureg m_numSources;
98
99         TableMigration(Map& map) : m_map(map) {
100         }
101
102         static TableMigration* create(Map& map, ureg numSources) {
103             TableMigration* migration =
104                 (TableMigration*) TURF_HEAP.alloc(sizeof(TableMigration) + sizeof(TableMigration::Source) * numSources);
105             new (migration) TableMigration(map);
106             migration->m_workerStatus.storeNonatomic(0);
107             migration->m_overflowed.storeNonatomic(false);
108             migration->m_unitsRemaining.storeNonatomic(0);
109             migration->m_numSources = numSources;
110             // Caller is responsible for filling in sources & destination
111             return migration;
112         }
113
114         virtual ~TableMigration() TURF_OVERRIDE {
115         }
116
117         void destroy() {
118             // Destroy all source tables.
119             for (ureg i = 0; i < m_numSources; i++)
120                 if (getSources()[i].table)
121                     getSources()[i].table->destroy();
122             // Delete the migration object itself.
123             this->TableMigration::~TableMigration();
124             TURF_HEAP.free(this);
125         }
126
127         Source* getSources() const {
128             return (Source*) (this + 1);
129         }
130
131         bool migrateRange(Table* srcTable, ureg startIdx);
132         virtual void run() TURF_OVERRIDE;
133     };
134
135     static Cell* find(Hash hash, Table* table) {
136         TURF_TRACE(Linear, 0, "[find] called", uptr(table), hash);
137         TURF_ASSERT(table);
138         TURF_ASSERT(hash != KeyTraits::NullHash);
139         ureg sizeMask = table->sizeMask;
140         for (ureg idx = ureg(hash);; idx++) {
141             idx &= sizeMask;
142             Cell* cell = table->getCells() + idx;
143             // Load the hash that was there.
144             Hash probeHash = cell->hash.load(turf::Relaxed);
145             if (probeHash == hash) {
146                 TURF_TRACE(Linear, 1, "[find] found existing cell", uptr(table), idx);
147                 return cell;
148             } else if (probeHash == KeyTraits::NullHash) {
149                 return NULL;
150             }
151         }
152     }
153
154     // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
155     enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
156     static InsertResult insert(Hash hash, Table* table, Cell*& cell) {
157         TURF_TRACE(Linear, 2, "[insert] called", uptr(table), hash);
158         TURF_ASSERT(table);
159         TURF_ASSERT(hash != KeyTraits::NullHash);
160         ureg sizeMask = table->sizeMask;
161
162         for (ureg idx = ureg(hash);; idx++) {
163             idx &= sizeMask;
164             cell = table->getCells() + idx;
165             // Load the existing hash.
166             Hash probeHash = cell->hash.load(turf::Relaxed);
167             if (probeHash == hash) {
168                 TURF_TRACE(Linear, 3, "[insert] found existing cell", uptr(table), idx);
169                 return InsertResult_AlreadyFound; // Key found in table. Return the existing cell.
170             }
171             if (probeHash == KeyTraits::NullHash) {
172                 // It's an empty cell. Try to reserve it.
173                 // But first, decrement cellsRemaining to ensure we have permission to create new cells.
174                 s32 prevCellsRemaining = table->cellsRemaining.fetchSub(1, turf::Relaxed);
175                 if (prevCellsRemaining <= 0) {
176                     // Table is overpopulated.
177                     TURF_TRACE(Linear, 4, "[insert] ran out of cellsRemaining", prevCellsRemaining, 0);
178                     table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
179                     return InsertResult_Overflow;
180                 }
181                 // Try to reserve this cell.
182                 Hash prevHash = cell->hash.compareExchange(KeyTraits::NullHash, hash, turf::Relaxed);
183                 if (prevHash == KeyTraits::NullHash) {
184                     // Success. We reserved a new cell.
185                     TURF_TRACE(Linear, 5, "[insert] reserved cell", prevCellsRemaining, idx);
186                     return InsertResult_InsertedNew;
187                 }
188                 // There was a race and another thread reserved that cell from under us.
189                 TURF_TRACE(Linear, 6, "[insert] detected race to reserve cell", ureg(hash), idx);
190                 table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
191                 if (prevHash == hash) {
192                     TURF_TRACE(Linear, 7, "[insert] race reserved same hash", ureg(hash), idx);
193                     return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell.
194                 }
195             }
196             // Try again in the next cell.
197         }
198     }
199
200     static void beginTableMigrationToSize(Map& map, Table* table, ureg nextTableSize) {
201         // Create new migration by DCLI.
202         TURF_TRACE(Linear, 8, "[beginTableMigrationToSize] called", 0, 0);
203         SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
204         if (job) {
205             TURF_TRACE(Linear, 9, "[beginTableMigrationToSize] new migration already exists", 0, 0);
206         } else {
207             turf::LockGuard<turf::Mutex> guard(table->mutex);
208             job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
209             if (job) {
210                 TURF_TRACE(Linear, 10, "[beginTableMigrationToSize] new migration already exists (double-checked)", 0, 0);
211             } else {
212                 // Create new migration.
213                 TableMigration* migration = TableMigration::create(map, 1);
214                 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
215                 migration->getSources()[0].table = table;
216                 migration->getSources()[0].sourceIndex.storeNonatomic(0);
217                 migration->m_destination = Table::create(nextTableSize);
218                 // Publish the new migration.
219                 table->jobCoordinator.storeRelease(migration);
220             }
221         }
222     }
223
224     static void beginTableMigration(Map& map, Table* table) {
225         // Estimate number of cells in use based on a small sample.
226         ureg idx = 0;
227         ureg sampleSize = turf::util::min<ureg>(table->sizeMask + 1, CellsInUseSample);
228         ureg inUseCells = 0;
229         for (; idx < sampleSize; idx++) {
230             Cell* cell = table->getCells() + idx;
231             Value value = cell->value.load(turf::Relaxed);
232             if (value == Value(ValueTraits::Redirect)) {
233                 // Another thread kicked off the jobCoordinator. The caller will participate upon return.
234                 TURF_TRACE(Linear, 11, "[beginTableMigration] redirected while determining table size", 0, 0);
235                 return;
236             }
237             if (value != Value(ValueTraits::NullValue))
238                 inUseCells++;
239         }
240         float inUseRatio = float(inUseCells) / sampleSize;
241         float estimatedInUse = (table->sizeMask + 1) * inUseRatio;
242 #if JUNCTION_LINEAR_FORCE_MIGRATION_OVERFLOWS
243         // Periodically underestimate the number of cells in use.
244         // This exercises the code that handles overflow during migration.
245         static ureg counter = 1;
246         if ((++counter & 3) == 0) {
247             estimatedInUse /= 4;
248         }
249 #endif
250         ureg nextTableSize = turf::util::max(InitialSize, turf::util::roundUpPowerOf2(ureg(estimatedInUse * 2)));
251         beginTableMigrationToSize(map, table, nextTableSize);
252     }
253 }; // Linear
254
255 template <class Map>
256 bool Linear<Map>::TableMigration::migrateRange(Table* srcTable, ureg startIdx) {
257     ureg srcSizeMask = srcTable->sizeMask;
258     ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
259     sreg valuesMigrated = 0;
260     // Iterate over source range.
261     for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
262         Cell* srcCell = srcTable->getCells() + (srcIdx & srcSizeMask);
263         Hash srcHash;
264         Value srcValue;
265         // Fetch the srcHash and srcValue.
266         for (;;) {
267             srcHash = srcCell->hash.load(turf::Relaxed);
268             if (srcHash == KeyTraits::NullHash) {
269                 // An unused cell. Try to put a Redirect marker in its value.
270                 srcValue =
271                     srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
272                 if (srcValue == Value(ValueTraits::Redirect)) {
273                     // srcValue is already marked Redirect due to previous incomplete migration.
274                     TURF_TRACE(Linear, 12, "[migrateRange] empty cell already redirected", uptr(srcTable), srcIdx);
275                     break;
276                 }
277                 if (srcValue == Value(ValueTraits::NullValue))
278                     break; // Redirect has been placed. Break inner loop, continue outer loop.
279                 TURF_TRACE(Linear, 13, "[migrateRange] race to insert key", uptr(srcTable), srcIdx);
280                 // Otherwise, somebody just claimed the cell. Read srcHash again...
281             } else {
282                 // Check for deleted/uninitialized value.
283                 srcValue = srcCell->value.load(turf::Relaxed);
284                 if (srcValue == Value(ValueTraits::NullValue)) {
285                     // Try to put a Redirect marker.
286                     if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
287                         break; // Redirect has been placed. Break inner loop, continue outer loop.
288                     TURF_TRACE(Linear, 14, "[migrateRange] race to insert value", uptr(srcTable), srcIdx);
289                     if (srcValue == Value(ValueTraits::Redirect)) {
290                         // FIXME: I don't think this will happen. Investigate & change to assert
291                         TURF_TRACE(Linear, 15, "[migrateRange] race inserted Redirect", uptr(srcTable), srcIdx);
292                         break;
293                     }
294                 } else if (srcValue == Value(ValueTraits::Redirect)) {
295                     // srcValue is already marked Redirect due to previous incomplete migration.
296                     TURF_TRACE(Linear, 16, "[migrateRange] in-use cell already redirected", uptr(srcTable), srcIdx);
297                     break;
298                 }
299
300                 // We've got a key/value pair to migrate.
301                 // Reserve a destination cell in the destination.
302                 TURF_ASSERT(srcHash != KeyTraits::NullHash);
303                 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
304                 TURF_ASSERT(srcValue != Value(ValueTraits::Redirect));
305                 Cell* dstCell;
306                 InsertResult result = insert(srcHash, m_destination, dstCell);
307                 // During migration, a hash can only exist in one place among all the source tables,
308                 // and it is only migrated by one thread. Therefore, the hash will never already exist
309                 // in the destination table:
310                 TURF_ASSERT(result != InsertResult_AlreadyFound);
311                 if (result == InsertResult_Overflow) {
312                     // Destination overflow.
313                     // This can happen for several reasons. For example, the source table could have
314                     // existed of all deleted cells when it overflowed, resulting in a small destination
315                     // table size, but then another thread could re-insert all the same hashes
316                     // before the migration completed.
317                     // Caller will cancel the current migration and begin a new one.
318                     return false;
319                 }
320                 // Migrate the old value to the new cell.
321                 for (;;) {
322                     // Copy srcValue to the destination.
323                     dstCell->value.store(srcValue, turf::Relaxed);
324                     // Try to place a Redirect marker in srcValue.
325                     Value doubleCheckedSrcValue =
326                         srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
327                     TURF_ASSERT(doubleCheckedSrcValue !=
328                                 Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
329                     if (doubleCheckedSrcValue == srcValue) {
330                         // No racing writes to the src. We've successfully placed the Redirect marker.
331                         // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
332                         // by a late-arriving erase.
333                         if (srcValue == Value(ValueTraits::NullValue))
334                             TURF_TRACE(Linear, 17, "[migrateRange] racing update was erase", uptr(srcTable), srcIdx);
335                         break;
336                     }
337                     // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
338                     TURF_TRACE(Linear, 18, "[migrateRange] race to update migrated value", uptr(srcTable), srcIdx);
339                     srcValue = doubleCheckedSrcValue;
340                 }
341                 // Cell successfully migrated. Proceed to next source cell.
342                 break;
343             }
344         }
345     }
346     // Range has been migrated successfully.
347     return true;
348 }
349
350 template <class Map>
351 void Linear<Map>::TableMigration::run() {
352     // Conditionally increment the shared # of workers.
353     ureg probeStatus = m_workerStatus.load(turf::Relaxed);
354     do {
355         if (probeStatus & 1) {
356             // End flag is already set, so do nothing.
357             TURF_TRACE(Linear, 19, "[TableMigration::run] already ended", uptr(this), 0);
358             return;
359         }
360     } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
361     // # of workers has been incremented, and the end flag is clear.
362     TURF_ASSERT((probeStatus & 1) == 0);
363
364     // Iterate over all source tables.
365     for (ureg s = 0; s < m_numSources; s++) {
366         Source& source = getSources()[s];
367         // Loop over all migration units in this source table.
368         for (;;) {
369             if (m_workerStatus.load(turf::Relaxed) & 1) {
370                 TURF_TRACE(Linear, 20, "[TableMigration::run] detected end flag set", uptr(this), 0);
371                 goto endMigration;
372             }
373             ureg startIdx = source.sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
374             if (startIdx >= source.table->sizeMask + 1)
375                 break; // No more migration units in this table. Try next source table.
376             bool overflowed = !migrateRange(source.table, startIdx);
377             if (overflowed) {
378                 // *** FAILED MIGRATION ***
379                 // TableMigration failed due to destination table overflow.
380                 // No other thread can declare the migration successful at this point, because *this* unit will never complete,
381                 // hence m_unitsRemaining won't reach zero.
382                 // However, multiple threads can independently detect a failed migration at the same time.
383                 TURF_TRACE(Linear, 21, "[TableMigration::run] destination overflow", uptr(source.table), uptr(startIdx));
384                 // The reason we store overflowed in a shared variable is because we can must flush all the worker threads before
385                 // we can safely deal with the overflow. Therefore, the thread that detects the failure is often different from
386                 // the thread
387                 // that deals with it.
388                 bool oldOverflowed = m_overflowed.exchange(overflowed, turf::Relaxed);
389                 if (oldOverflowed)
390                     TURF_TRACE(Linear, 22, "[TableMigration::run] race to set m_overflowed", uptr(overflowed),
391                                uptr(oldOverflowed));
392                 m_workerStatus.fetchOr(1, turf::Relaxed);
393                 goto endMigration;
394             }
395             sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
396             TURF_ASSERT(prevRemaining > 0);
397             if (prevRemaining == 1) {
398                 // *** SUCCESSFUL MIGRATION ***
399                 // That was the last chunk to migrate.
400                 m_workerStatus.fetchOr(1, turf::Relaxed);
401                 goto endMigration;
402             }
403         }
404     }
405     TURF_TRACE(Linear, 23, "[TableMigration::run] out of migration units", uptr(this), 0);
406
407 endMigration:
408     // Decrement the shared # of workers.
409     probeStatus = m_workerStatus.fetchSub(
410         2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
411     if (probeStatus >= 4) {
412         // There are other workers remaining. Return here so that only the very last worker will proceed.
413         TURF_TRACE(Linear, 24, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
414         return;
415     }
416
417     // We're the very last worker thread.
418     // Perform the appropriate post-migration step depending on whether the migration succeeded or failed.
419     TURF_ASSERT(probeStatus == 3);
420     bool overflowed = m_overflowed.loadNonatomic(); // No racing writes at this point
421     if (!overflowed) {
422         // The migration succeeded. This is the most likely outcome. Publish the new subtree.
423         m_map.publishTableMigration(this);
424         // End the jobCoodinator.
425         getSources()[0].table->jobCoordinator.end();
426     } else {
427         // The migration failed due to the overflow of the destination table.
428         Table* origTable = getSources()[0].table;
429         turf::LockGuard<turf::Mutex> guard(origTable->mutex);
430         SimpleJobCoordinator::Job* checkedJob = origTable->jobCoordinator.loadConsume();
431         if (checkedJob != this) {
432             TURF_TRACE(Linear, 25, "[TableMigration::run] a new TableMigration was already started", uptr(origTable),
433                        uptr(checkedJob));
434         } else {
435             TableMigration* migration = TableMigration::create(m_map, m_numSources + 1);
436             // Double the destination table size.
437             migration->m_destination = Table::create((m_destination->sizeMask + 1) * 2);
438             // Transfer source tables to the new migration.
439             for (ureg i = 0; i < m_numSources; i++) {
440                 migration->getSources()[i].table = getSources()[i].table;
441                 getSources()[i].table = NULL;
442                 migration->getSources()[i].sourceIndex.storeNonatomic(0);
443             }
444             migration->getSources()[m_numSources].table = m_destination;
445             migration->getSources()[m_numSources].sourceIndex.storeNonatomic(0);
446             // Calculate total number of migration units to move.
447             ureg unitsRemaining = 0;
448             for (ureg s = 0; s < migration->m_numSources; s++)
449                 unitsRemaining += migration->getSources()[s].table->getNumMigrationUnits();
450             migration->m_unitsRemaining.storeNonatomic(unitsRemaining);
451             // Publish the new migration.
452             origTable->jobCoordinator.storeRelease(migration);
453         }
454     }
455
456     // We're done with this TableMigration. Queue it for GC.
457     DefaultQSBR.enqueue(&TableMigration::destroy, this);
458 }
459
460 } // namespace details
461 } // namespace junction
462
463 #endif // JUNCTION_DETAILS_LINEAR_H