d2ee367dce8a0d0e3ceaa926e04b5b2f1cb1eb61
[junction.git] / junction / details / Linear.h
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #ifndef JUNCTION_DETAILS_LINEAR_H
14 #define JUNCTION_DETAILS_LINEAR_H
15
16 #include <junction/Core.h>
17 #include <turf/Atomic.h>
18 #include <turf/Mutex.h>
19 #include <turf/ManualResetEvent.h>
20 #include <turf/Util.h>
21 #include <junction/MapTraits.h>
22 #include <turf/Trace.h>
23 #include <turf/Heap.h>
24 #include <junction/SimpleJobCoordinator.h>
25 #include <junction/QSBR.h>
26
27 namespace junction {
28 namespace details {
29
30 TURF_TRACE_DECLARE(Linear, 22)
31
32 template <class Map>
33 struct Linear {
34     typedef typename Map::Hash Hash;
35     typedef typename Map::Value Value;
36     typedef typename Map::KeyTraits KeyTraits;
37     typedef typename Map::ValueTraits ValueTraits;
38
39     static const ureg InitialSize = 8;
40     static const ureg TableMigrationUnitSize = 32;
41     static const ureg LinearSearchLimit = 128;
42     static const ureg CellsInUseSample = LinearSearchLimit;
43     TURF_STATIC_ASSERT(LinearSearchLimit > 0 && LinearSearchLimit < 256);              // Must fit in CellGroup::links
44     TURF_STATIC_ASSERT(CellsInUseSample > 0 && CellsInUseSample <= LinearSearchLimit); // Limit sample to failed search chain
45
46     struct Cell {
47         turf::Atomic<Hash> hash;
48         turf::Atomic<Value> value;
49     };
50
51     struct Table {
52         const ureg sizeMask; // a power of two minus one
53         const ureg limitNumValues;
54         turf::Atomic<sreg> cellsRemaining;
55         turf::Atomic<sreg> valuesRemaining;
56         turf::Mutex mutex;                   // to DCLI the TableMigration (stored in the jobCoordinator)
57         SimpleJobCoordinator jobCoordinator; // makes all blocked threads participate in the migration
58
59         Table(ureg sizeMask, ureg limitNumValues)
60             : sizeMask(sizeMask), limitNumValues(limitNumValues), cellsRemaining(limitNumValues),
61               valuesRemaining(limitNumValues) {
62         }
63
64         static Table* create(ureg tableSize, ureg limitNumValues) {
65             TURF_ASSERT(turf::util::isPowerOf2(tableSize));
66             Table* table = (Table*) TURF_HEAP.alloc(sizeof(Table) + sizeof(Cell) * tableSize);
67             new (table) Table(tableSize - 1, limitNumValues);
68             for (ureg j = 0; j < tableSize; j++) {
69                 table->getCells()[j].hash.storeNonatomic(KeyTraits::NullHash);
70                 table->getCells()[j].value.storeNonatomic(Value(ValueTraits::NullValue));
71             }
72             return table;
73         }
74
75         void destroy() {
76             this->Table::~Table();
77             TURF_HEAP.free(this);
78         }
79
80         Cell* getCells() const {
81             return (Cell*) (this + 1);
82         }
83
84         ureg getNumMigrationUnits() const {
85             return sizeMask / TableMigrationUnitSize + 1;
86         }
87     };
88
89     class TableMigration : public SimpleJobCoordinator::Job {
90     public:
91         Map& m_map;
92         Table* m_source;
93         turf::Atomic<ureg> m_sourceIndex;
94         Table* m_destination;
95         turf::Atomic<ureg> m_workerStatus; // number of workers + end flag
96         turf::Atomic<sreg> m_unitsRemaining;
97
98         TableMigration(Map& map) : m_map(map), m_sourceIndex(0), m_workerStatus(0), m_unitsRemaining(0) {
99             // Caller is responsible for filling in source & destination
100         }
101
102         virtual ~TableMigration() TURF_OVERRIDE {
103             // Destroy source table.
104             m_source->destroy();
105         }
106
107         void destroy() {
108             delete this;
109         }
110
111         bool migrateRange(ureg startIdx);
112         virtual void run() TURF_OVERRIDE;
113     };
114
115     static Cell* find(Hash hash, Table* table) {
116         TURF_TRACE(Linear, 0, "[find] called", uptr(table), hash);
117         TURF_ASSERT(table);
118         TURF_ASSERT(hash != KeyTraits::NullHash);
119         ureg sizeMask = table->sizeMask;
120         for (ureg idx = hash;; idx++) {
121             idx &= sizeMask;
122             Cell* cell = table->getCells() + idx;
123             // Load the hash that was there.
124             uptr probeHash = cell->hash.load(turf::Relaxed);
125             if (probeHash == hash) {
126                 TURF_TRACE(Linear, 1, "[find] found existing cell", uptr(table), idx);
127                 return cell;
128             } else if (probeHash == KeyTraits::NullHash) {
129                 return NULL;
130             }
131         }
132     }
133
134     // FIXME: Possible optimization: Dedicated insert for migration? It wouldn't check for InsertResult_AlreadyFound.
135     enum InsertResult { InsertResult_AlreadyFound, InsertResult_InsertedNew, InsertResult_Overflow };
136     static InsertResult insert(Hash hash, Table* table, Cell*& cell) {
137         TURF_TRACE(Linear, 2, "[insert] called", uptr(table), hash);
138         TURF_ASSERT(table);
139         TURF_ASSERT(hash != KeyTraits::NullHash);
140         ureg sizeMask = table->sizeMask;
141
142         for (ureg idx = hash;; idx++) {
143             idx &= sizeMask;
144             cell = table->getCells() + idx;
145             // Load the existing hash.
146             uptr probeHash = cell->hash.load(turf::Relaxed);
147             if (probeHash == hash) {
148                 TURF_TRACE(Linear, 3, "[insert] found existing cell", uptr(table), idx);
149                 return InsertResult_AlreadyFound; // Key found in table. Return the existing cell.
150             }
151             if (probeHash == KeyTraits::NullHash) {
152                 // It's an empty cell. Try to reserve it.
153                 // But first, decrement cellsRemaining to ensure we have permission to create new getCells().
154                 s32 prevCellsRemaining = table->cellsRemaining.fetchSub(1, turf::Relaxed);
155                 if (prevCellsRemaining <= 0) {
156                     // Table is overpopulated.
157                     TURF_TRACE(Linear, 4, "[insert] ran out of cellsRemaining", prevCellsRemaining, 0);
158                     table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
159                     return InsertResult_Overflow;
160                 }
161                 // Try to reserve this cell.
162                 uptr prevHash = cell->hash.compareExchange(KeyTraits::NullHash, hash, turf::Relaxed);
163                 if (prevHash == KeyTraits::NullHash) {
164                     // Success. We reserved a new cell.
165                     TURF_TRACE(Linear, 5, "[insert] reserved cell", prevCellsRemaining, idx);
166                     return InsertResult_InsertedNew;
167                 }
168                 // There was a race and another thread reserved that cell from under us.
169                 TURF_TRACE(Linear, 6, "[insert] detected race to reserve cell", ureg(hash), idx);
170                 table->cellsRemaining.fetchAdd(1, turf::Relaxed); // Undo cellsRemaining decrement
171                 if (prevHash == hash) {
172                     TURF_TRACE(Linear, 7, "[insert] race reserved same hash", ureg(hash), idx);
173                     return InsertResult_AlreadyFound; // They inserted the same key. Return the existing cell.
174                 }
175             }
176             // Try again in the next cell.
177         }
178     }
179
180     static void beginTableMigration(Map& map, Table* table) {
181         // Create new migration by DCLI.
182         TURF_TRACE(Linear, 8, "[beginTableMigration] called", 0, 0);
183         SimpleJobCoordinator::Job* job = table->jobCoordinator.loadConsume();
184         if (job) {
185             TURF_TRACE(Linear, 9, "[beginTableMigration] new migration already exists", 0, 0);
186         } else {
187             turf::LockGuard<turf::Mutex> guard(table->mutex);
188             job = table->jobCoordinator.loadConsume(); // Non-atomic would be sufficient, but that's OK.
189             if (job) {
190                 TURF_TRACE(Linear, 10, "[beginTableMigration] new migration already exists (double-checked)", 0, 0);
191             } else {
192                 // Determine new migration size and cap the number of values that can be added concurrent to the migration.
193                 sreg oldValuesLimit = table->limitNumValues;
194                 sreg oldValuesRemaining = table->valuesRemaining.load(turf::Relaxed);
195                 sreg oldValuesInUse = oldValuesLimit - oldValuesRemaining;
196             calculateNextTableSize:
197                 sreg nextTableSize = turf::util::roundUpPowerOf2(oldValuesInUse * 2);
198                 sreg nextLimitNumValues = nextTableSize * 3 / 4;
199                 if (nextLimitNumValues < oldValuesLimit) {
200                     // Set the new limitNumValues on the *current* table.
201                     // This prevents other threads, while the migration is in progress, from concurrently
202                     // re-inserting more values than the new table can hold.
203                     // To set the new limitNumValues on the current table in an atomic fashion,
204                     // we update its valuesRemaining via CAS loop:
205                     for (;;) {
206                         // We must recalculate desiredValuesRemaining on each iteration of the CAS loop
207                         oldValuesInUse = oldValuesLimit - oldValuesRemaining;
208                         sreg desiredValuesRemaining = nextLimitNumValues - oldValuesInUse;
209                         if (desiredValuesRemaining < 0) {
210                             TURF_TRACE(Linear, 11, "[table] restarting valuesRemaining CAS loop", nextLimitNumValues,
211                                        desiredValuesRemaining);
212                             // Must recalculate nextTableSize. Goto, baby!
213                             goto calculateNextTableSize;
214                         }
215                         if (table->valuesRemaining.compareExchangeWeak(oldValuesRemaining, desiredValuesRemaining, turf::Relaxed,
216                                                                        turf::Relaxed))
217                             break; // Success!
218                         // CAS failed because table->valuesRemaining was modified by another thread.
219                         // An updated value has been reloaded into oldValuesRemaining (modified by reference).
220                         // Recalculate desiredValuesRemaining to account for the updated value, and try again.
221                         TURF_TRACE(Linear, 12, "[table] valuesRemaining CAS failed", oldValuesRemaining, desiredValuesRemaining);
222                     }
223                 }
224                 // Now we are assured that the new table will not become overpopulated during the migration process.
225                 // Create new migration.
226                 TableMigration* migration = new TableMigration(map);
227                 migration->m_source = table;
228                 migration->m_destination = Table::create(nextTableSize, nextLimitNumValues);
229                 migration->m_unitsRemaining.storeNonatomic(table->getNumMigrationUnits());
230                 // Publish the new migration.
231                 table->jobCoordinator.storeRelease(migration);
232             }
233         }
234     }
235 }; // Linear
236
237 template <class Map>
238 bool Linear<Map>::TableMigration::migrateRange(ureg startIdx) {
239     ureg srcSizeMask = m_source->sizeMask;
240     ureg endIdx = turf::util::min(startIdx + TableMigrationUnitSize, srcSizeMask + 1);
241     sreg valuesMigrated = 0;
242     // Iterate over source range.
243     for (ureg srcIdx = startIdx; srcIdx < endIdx; srcIdx++) {
244         Cell* srcCell = m_source->getCells() + (srcIdx & srcSizeMask);
245         Hash srcHash;
246         Value srcValue;
247         // Fetch the srcHash and srcValue.
248         for (;;) {
249             srcHash = srcCell->hash.load(turf::Relaxed);
250             if (srcHash == KeyTraits::NullHash) {
251                 // An unused cell. Try to put a Redirect marker in its value.
252                 srcValue =
253                     srcCell->value.compareExchange(Value(ValueTraits::NullValue), Value(ValueTraits::Redirect), turf::Relaxed);
254                 if (srcValue == Value(ValueTraits::Redirect)) {
255                     // srcValue is already marked Redirect due to previous incomplete migration.
256                     TURF_TRACE(Linear, 13, "[migrateRange] empty cell already redirected", uptr(m_source), srcIdx);
257                     break;
258                 }
259                 if (srcValue == Value(ValueTraits::NullValue))
260                     break; // Redirect has been placed. Break inner loop, continue outer loop.
261                 TURF_TRACE(Linear, 14, "[migrateRange] race to insert key", uptr(m_source), srcIdx);
262                 // Otherwise, somebody just claimed the cell. Read srcHash again...
263             } else {
264                 // Check for deleted/uninitialized value.
265                 srcValue = srcCell->value.load(turf::Relaxed);
266                 if (srcValue == Value(ValueTraits::NullValue)) {
267                     // Try to put a Redirect marker.
268                     if (srcCell->value.compareExchangeStrong(srcValue, Value(ValueTraits::Redirect), turf::Relaxed))
269                         break; // Redirect has been placed. Break inner loop, continue outer loop.
270                     TURF_TRACE(Linear, 15, "[migrateRange] race to insert value", uptr(m_source), srcIdx);
271                 }
272
273                 // We've got a key/value pair to migrate.
274                 // Reserve a destination cell in the destination.
275                 TURF_ASSERT(srcHash != KeyTraits::NullHash);
276                 TURF_ASSERT(srcValue != Value(ValueTraits::NullValue));
277                 TURF_ASSERT(srcValue != Value(ValueTraits::Redirect)); // Incomplete/concurrent migrations are impossible.
278                 Cell* dstCell;
279                 InsertResult result = insert(srcHash, m_destination, dstCell);
280                 // During migration, a hash can only exist in one place among all the source tables,
281                 // and it is only migrated by one thread. Therefore, the hash will never already exist
282                 // in the destination table:
283                 TURF_ASSERT(result != InsertResult_AlreadyFound);
284                 TURF_ASSERT(result != InsertResult_Overflow);
285                 // Migrate the old value to the new cell.
286                 for (;;) {
287                     // Copy srcValue to the destination.
288                     dstCell->value.store(srcValue, turf::Relaxed);
289                     // Try to place a Redirect marker in srcValue.
290                     Value doubleCheckedSrcValue =
291                         srcCell->value.compareExchange(srcValue, Value(ValueTraits::Redirect), turf::Relaxed);
292                     TURF_ASSERT(doubleCheckedSrcValue !=
293                                 Value(ValueTraits::Redirect)); // Only one thread can redirect a cell at a time.
294                     if (doubleCheckedSrcValue == srcValue) {
295                         // No racing writes to the src. We've successfully placed the Redirect marker.
296                         // srcValue was non-NULL when we decided to migrate it, but it may have changed to NULL
297                         // by a late-arriving erase.
298                         if (srcValue == Value(ValueTraits::NullValue))
299                             TURF_TRACE(Linear, 16, "[migrateRange] racing update was erase", uptr(m_source), srcIdx);
300                         else
301                             valuesMigrated++;
302                         break;
303                     }
304                     // There was a late-arriving write (or erase) to the src. Migrate the new value and try again.
305                     TURF_TRACE(Linear, 17, "[migrateRange] race to update migrated value", uptr(m_source), srcIdx);
306                     srcValue = doubleCheckedSrcValue;
307                 }
308                 // Cell successfully migrated. Proceed to next source cell.
309                 break;
310             }
311         }
312     }
313     sreg prevValuesRemaining = m_destination->valuesRemaining.fetchSub(valuesMigrated, turf::Relaxed);
314     TURF_ASSERT(valuesMigrated <= prevValuesRemaining);
315     TURF_UNUSED(prevValuesRemaining);
316     // Range has been migrated successfully.
317     return true;
318 }
319
320 template <class Map>
321 void Linear<Map>::TableMigration::run() {
322     // Conditionally increment the shared # of workers.
323     ureg probeStatus = m_workerStatus.load(turf::Relaxed);
324     do {
325         if (probeStatus & 1) {
326             // End flag is already set, so do nothing.
327             TURF_TRACE(Linear, 18, "[TableMigration::run] already ended", uptr(this), 0);
328             return;
329         }
330     } while (!m_workerStatus.compareExchangeWeak(probeStatus, probeStatus + 2, turf::Relaxed, turf::Relaxed));
331     // # of workers has been incremented, and the end flag is clear.
332     TURF_ASSERT((probeStatus & 1) == 0);
333
334     // Loop over all migration units in the source table.
335     for (;;) {
336         if (m_workerStatus.load(turf::Relaxed) & 1) {
337             TURF_TRACE(Linear, 19, "[TableMigration::run] detected end flag set", uptr(this), 0);
338             goto endMigration;
339         }
340         ureg startIdx = m_sourceIndex.fetchAdd(TableMigrationUnitSize, turf::Relaxed);
341         if (startIdx >= m_source->sizeMask + 1)
342             break; // No more migration units.
343         migrateRange(startIdx);
344         sreg prevRemaining = m_unitsRemaining.fetchSub(1, turf::Relaxed);
345         TURF_ASSERT(prevRemaining > 0);
346         if (prevRemaining == 1) {
347             // That was the last chunk to migrate.
348             m_workerStatus.fetchOr(1, turf::Relaxed);
349             goto endMigration;
350         }
351     }
352     TURF_TRACE(Linear, 20, "[TableMigration::run] out of migration units", uptr(this), 0);
353
354 endMigration:
355     // Decrement the shared # of workers.
356     probeStatus = m_workerStatus.fetchSub(
357         2, turf::AcquireRelease); // AcquireRelease makes all previous writes visible to the last worker thread.
358     if (probeStatus >= 4) {
359         // There are other workers remaining. Return here so that only the very last worker will proceed.
360         TURF_TRACE(Linear, 21, "[TableMigration::run] not the last worker", uptr(this), uptr(probeStatus));
361         return;
362     }
363
364     // We're the very last worker thread.
365     // Publish the new subtree.
366     TURF_ASSERT(probeStatus == 3);
367     m_map.publishTableMigration(this);
368     // End the jobCoodinator.
369     m_source->jobCoordinator.end();
370
371     // We're done with this TableMigration. Queue it for GC.
372     DefaultQSBR.enqueue(&TableMigration::destroy, this);
373 }
374
375 } // namespace details
376 } // namespace junction
377
378 #endif // JUNCTION_DETAILS_LINEAR_H