1 /*------------------------------------------------------------------------
2 Junction: Concurrent data structures in C++
3 Copyright (c) 2016 Jeff Preshing
5 Distributed under the Simplified BSD License.
6 Original location: https://github.com/preshing/junction
8 This software is distributed WITHOUT ANY WARRANTY; without even the
9 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10 See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
13 #ifndef JUNCTION_CONCURRENTMAP_GRAMPA_H
14 #define JUNCTION_CONCURRENTMAP_GRAMPA_H
16 #include <junction/Core.h>
17 #include <junction/details/Grampa.h>
18 #include <junction/QSBR.h>
19 #include <turf/Heap.h>
20 #include <turf/Trace.h>
24 TURF_TRACE_DECLARE(ConcurrentMap_Grampa, 27)
26 template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V>>
27 class ConcurrentMap_Grampa {
32 typedef VT ValueTraits;
33 typedef typename turf::util::BestFit<Key>::Unsigned Hash;
34 typedef details::Grampa<ConcurrentMap_Grampa> Details;
37 turf::Atomic<uptr> m_root;
39 bool locateTable(typename Details::Table*& table, ureg& sizeMask, Hash hash) {
40 ureg root = m_root.load(turf::Consume);
42 typename Details::FlatTree* flatTree = (typename Details::FlatTree*) (root & ~ureg(1));
44 ureg leafIdx = (hash >> flatTree->safeShift);
45 table = flatTree->getTables()[leafIdx].load(turf::Relaxed);
46 if (ureg(table) != Details::RedirectFlatTree) {
47 sizeMask = (Details::LeafSize - 1);
50 TURF_TRACE(ConcurrentMap_Grampa, 0, "[locateTable] flattree lookup redirected", uptr(flatTree), uptr(leafIdx));
51 typename Details::FlatTreeMigration* migration = Details::getExistingFlatTreeMigration(flatTree);
53 migration->m_completed.wait();
54 flatTree = migration->m_destination;
59 table = (typename Details::Table*) root;
60 sizeMask = table->sizeMask;
65 void createInitialTable(ureg initialSize) {
66 if (!m_root.load(turf::Relaxed)) {
67 // This could perform DCLI, but let's avoid needing a mutex instead.
68 typename Details::Table* table = Details::Table::create(initialSize, 0, sizeof(Hash) * 8);
69 if (m_root.compareExchange(uptr(NULL), uptr(table), turf::Release)) {
70 TURF_TRACE(ConcurrentMap_Grampa, 1, "[createInitialTable] race to create initial table", uptr(this), 0);
77 ConcurrentMap_Grampa(ureg initialSize = 0) : m_root(uptr(NULL)) {
78 // FIXME: Support initialSize argument
79 TURF_UNUSED(initialSize);
82 ~ConcurrentMap_Grampa() {
83 ureg root = m_root.loadNonatomic();
85 typename Details::FlatTree* flatTree = (typename Details::FlatTree*) (root & ~ureg(1));
86 ureg size = (Hash(-1) >> flatTree->safeShift) + 1;
87 typename Details::Table* lastTableGCed = NULL;
88 for (ureg i = 0; i < size; i++) {
89 typename Details::Table* t = flatTree->getTables()[i].loadNonatomic();
90 TURF_ASSERT(ureg(t) != Details::RedirectFlatTree);
91 if (t != lastTableGCed) {
98 typename Details::Table* t = (typename Details::Table*) root;
103 // publishTableMigration() is called by exactly one thread from Details::TableMigration::run()
104 // after all the threads participating in the migration have completed their work.
105 // There are no racing writes to the same range of hashes.
106 void publishTableMigration(typename Details::TableMigration* migration) {
107 TURF_TRACE(ConcurrentMap_Grampa, 2, "[publishTableMigration] called", uptr(migration), 0);
108 if (migration->m_safeShift == 0) {
109 // This TableMigration replaces the entire map with a single table.
110 TURF_ASSERT(migration->m_baseHash == 0);
111 TURF_ASSERT(migration->m_numDestinations == 1);
112 ureg oldRoot = m_root.loadNonatomic(); // There are no racing writes to m_root.
113 // Store the single table in m_root directly.
114 typename Details::Table* newTable = migration->getDestinations()[0];
115 m_root.store(uptr(newTable), turf::Release); // Make table contents visible
116 newTable->isPublished.signal();
117 if ((oldRoot & 1) == 0) {
118 TURF_TRACE(ConcurrentMap_Grampa, 3, "[publishTableMigration] replacing single root with single root", uptr(migration), 0);
119 // If oldRoot is a table, it must be the original source of the migration.
120 TURF_ASSERT((typename Details::Table*) oldRoot == migration->getSources()[0].table);
121 // Don't GC it here. The caller will GC it since it's a source of the TableMigration.
123 TURF_TRACE(ConcurrentMap_Grampa, 4, "[publishTableMigration] replacing flattree with single root", uptr(migration), 0);
124 // The entire previous flattree is being replaced.
125 Details::garbageCollectFlatTree((typename Details::FlatTree*) (oldRoot & ~ureg(1)));
127 // Caller will GC the TableMigration.
129 // We are either publishing a subtree of one or more tables, or replacing the entire map with multiple tables.
130 // In either case, there will be a flattree after this function returns.
131 TURF_ASSERT(migration->m_safeShift < sizeof(Hash) * 8); // If m_numDestinations > 1, some index bits must remain after shifting
132 ureg oldRoot = m_root.load(turf::Consume);
133 if ((oldRoot & 1) == 0) {
134 // There's no flattree yet. This means the TableMigration is publishing the full range of hashes.
135 TURF_ASSERT(migration->m_baseHash == 0);
136 TURF_ASSERT((Hash(-1) >> migration->m_safeShift) == (migration->m_numDestinations - 1));
137 // The oldRoot should be the original source of the migration.
138 TURF_ASSERT((typename Details::Table*) oldRoot == migration->getSources()[0].table);
139 // Furthermore, it is guaranteed that there are no racing writes to m_root.
140 // Create a new flattree and store it to m_root.
141 TURF_TRACE(ConcurrentMap_Grampa, 5, "[publishTableMigration] replacing single root with flattree", uptr(migration), 0);
142 typename Details::FlatTree* flatTree = Details::FlatTree::create(migration->m_safeShift);
143 typename Details::Table* prevTable = NULL;
144 for (ureg i = 0; i < migration->m_numDestinations; i++) {
145 typename Details::Table* newTable = migration->getDestinations()[i];
146 flatTree->getTables()[i].storeNonatomic(newTable);
147 if (newTable != prevTable) {
148 newTable->isPublished.signal();
149 prevTable = newTable;
152 m_root.store(uptr(flatTree) | 1, turf::Release); // Ensure visibility of flatTree->tables
153 // Caller will GC the TableMigration.
154 // Caller will also GC the old oldRoot since it's a source of the TableMigration.
156 // There is an existing flattree, and we are publishing one or more tables to it.
157 // Attempt to publish the subtree in a loop.
158 // The loop is necessary because we might get redirected in the middle of publishing.
159 TURF_TRACE(ConcurrentMap_Grampa, 6, "[publishTableMigration] publishing subtree to existing flattree", uptr(migration), 0);
160 typename Details::FlatTree* flatTree = (typename Details::FlatTree*) (oldRoot & ~ureg(1));
161 ureg subTreeEntriesPublished = 0;
162 typename Details::Table* tableToReplace = migration->getSources()[0].table;
163 // Wait here so that we only replace tables that are fully published.
164 // Otherwise, there will be a race between a subtree and its own children.
165 // (If all ManualResetEvent objects supported isPublished(), we could add a TURF_TRACE counter for this.
166 // In previous tests, such a counter does in fact get hit.)
167 tableToReplace->isPublished.wait();
168 typename Details::Table* prevTable = NULL;
171 if (migration->m_safeShift < flatTree->safeShift) {
172 // We'll need to migrate to larger flattree before publishing our new subtree.
173 // First, try to create a FlatTreeMigration with the necessary properties.
174 // This will fail if an existing FlatTreeMigration has already been created using the same source.
175 // In that case, we'll help complete the existing FlatTreeMigration, then we'll retry the loop.
176 TURF_TRACE(ConcurrentMap_Grampa, 7, "[publishTableMigration] existing flattree too small", uptr(migration), 0);
177 typename Details::FlatTreeMigration* flatTreeMigration = Details::createFlatTreeMigration(*this, flatTree, migration->m_safeShift);
178 tableToReplace->jobCoordinator.runOne(flatTreeMigration);
179 flatTreeMigration->m_completed.wait(); // flatTreeMigration->m_destination becomes entirely visible
180 flatTree = flatTreeMigration->m_destination;
181 // The FlatTreeMigration has already been GC'ed by the last worker.
184 ureg repeat = ureg(1) << (migration->m_safeShift - flatTree->safeShift);
185 ureg dstStartIndex = migration->m_baseHash >> flatTree->safeShift;
186 // The subtree we're about to publish fits inside the flattree.
187 TURF_ASSERT(dstStartIndex + migration->m_numDestinations * repeat - 1 <= Hash(-1) >> flatTree->safeShift);
188 // If a previous attempt to publish got redirected, resume publishing into the new flattree,
189 // starting with the first subtree entry that has not yet been fully published, as given by subTreeEntriesPublished.
190 // (Note: We could, in fact, restart the publish operation starting at entry 0. That would be valid too.
191 // We are the only thread that can modify this particular range of the flattree at this time.)
192 turf::Atomic<typename Details::Table*>* dstLeaf = flatTree->getTables() + dstStartIndex + (subTreeEntriesPublished * repeat);
193 typename Details::Table** subFlatTree = migration->getDestinations();
194 while (subTreeEntriesPublished < migration->m_numDestinations) {
195 typename Details::Table* srcTable = subFlatTree[subTreeEntriesPublished];
196 for (ureg r = repeat; r > 0; r--) {
197 typename Details::Table* probeTable = tableToReplace;
198 while (!dstLeaf->compareExchangeStrong(probeTable, srcTable, turf::Relaxed)) {
199 if (ureg(probeTable) == Details::RedirectFlatTree) {
200 // We've been redirected.
201 // Help with the FlatTreeMigration, then try again.
202 TURF_TRACE(ConcurrentMap_Grampa, 8, "[publishTableMigration] redirected", uptr(migration), uptr(dstLeaf));
203 typename Details::FlatTreeMigration* flatTreeMigration = Details::getExistingFlatTreeMigration(flatTree);
204 tableToReplace->jobCoordinator.runOne(flatTreeMigration);
205 flatTreeMigration->m_completed.wait(); // flatTreeMigration->m_destination becomes entirely visible
206 flatTree = flatTreeMigration->m_destination;
209 // The only other possibility is that we were previously redirected, and the subtree entry got partially published.
210 TURF_TRACE(ConcurrentMap_Grampa, 9, "[publishTableMigration] recovering from partial publish", uptr(migration), 0);
211 TURF_ASSERT(probeTable == srcTable);
213 // The caller will GC the table) being replaced them since it's a source of the TableMigration.
216 if (prevTable != srcTable) {
217 srcTable->isPublished.signal();
218 prevTable = srcTable;
220 subTreeEntriesPublished++;
222 // We've successfully published the migrated sub-flattree.
223 // Caller will GC the TableMigration.
231 void publishFlatTreeMigration(typename Details::FlatTreeMigration* migration) {
232 // There are no racing writes.
233 // Old root must be the migration source (a flattree).
234 TURF_ASSERT(m_root.loadNonatomic() == (ureg(migration->m_source) | 1));
235 // Publish the new flattree, making entire table contents visible.
236 m_root.store(uptr(migration->m_destination) | 1, turf::Release);
237 // Don't GC the old flattree. The FlatTreeMigration will do that, since it's a source.
240 // A Mutator represents a known cell in the hash table.
241 // It's meant for manipulations within a temporary function scope.
242 // Obviously you must not call QSBR::Update while holding a Mutator.
243 // Any operation that modifies the table (exchangeValue, eraseValue)
244 // may be forced to follow a redirected cell, which changes the Mutator itself.
245 // Note that even if the Mutator was constructed from an existing cell,
246 // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted,
247 // or if another thread deletes the key between the two steps.
250 friend class ConcurrentMap_Grampa;
252 ConcurrentMap_Grampa& m_map;
253 typename Details::Table* m_table;
255 typename Details::Cell* m_cell;
258 // Constructor: Find existing cell
259 Mutator(ConcurrentMap_Grampa& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
260 TURF_TRACE(ConcurrentMap_Grampa, 10, "[Mutator] find constructor called", uptr(map.m_root.load(turf::Relaxed)), uptr(key));
261 Hash hash = KeyTraits::hash(key);
263 if (!m_map.locateTable(m_table, m_sizeMask, hash))
265 m_cell = Details::find(hash, m_table, m_sizeMask);
268 m_value = m_cell->value.load(turf::Consume);
269 if (m_value != Value(ValueTraits::Redirect))
270 return; // Found an existing value
271 // We've encountered a Redirect value. Help finish the migration.
272 TURF_TRACE(ConcurrentMap_Grampa, 11, "[Mutator] find was redirected", uptr(m_table), 0);
273 m_table->jobCoordinator.participate();
274 // Try again using the latest root.
278 // Constructor: Insert cell
279 Mutator(ConcurrentMap_Grampa& map, Key key) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
280 TURF_TRACE(ConcurrentMap_Grampa, 12, "[Mutator] insert constructor called", uptr(map.m_root.load(turf::Relaxed)), uptr(key));
281 Hash hash = KeyTraits::hash(key);
283 if (!m_map.locateTable(m_table, m_sizeMask, hash)) {
284 m_map.createInitialTable(Details::MinTableSize);
287 switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell
288 case Details::InsertResult_InsertedNew: {
289 // We've inserted a new cell. Don't load m_cell->value.
292 case Details::InsertResult_AlreadyFound: {
293 // The hash was already found in the table.
294 m_value = m_cell->value.load(turf::Consume);
295 if (m_value == Value(ValueTraits::Redirect)) {
296 // We've encountered a Redirect value.
297 TURF_TRACE(ConcurrentMap_Grampa, 13, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value));
298 break; // Help finish the migration.
300 return; // Found an existing value
302 case Details::InsertResult_Overflow: {
303 Details::beginTableMigration(m_map, m_table, overflowIdx);
307 // A migration has been started (either by us, or another thread). Participate until it's complete.
308 m_table->jobCoordinator.participate();
310 // Try again using the latest root.
315 Value getValue() const {
316 // Return previously loaded value. Don't load it again.
320 Value exchangeValue(Value desired) {
321 TURF_ASSERT(desired != Value(ValueTraits::NullValue));
322 TURF_ASSERT(m_cell); // Cell must have been found or inserted
323 TURF_TRACE(ConcurrentMap_Grampa, 14, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
325 Value oldValue = m_value;
326 if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
327 // Exchange was successful. Return previous value.
328 TURF_TRACE(ConcurrentMap_Grampa, 15, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired));
329 Value result = m_value;
330 m_value = desired; // Leave the mutator in a valid state
333 // The CAS failed and m_value has been updated with the latest value.
334 if (m_value != Value(ValueTraits::Redirect)) {
335 TURF_TRACE(ConcurrentMap_Grampa, 16, "[Mutator::exchangeValue] detected race to write value", uptr(m_table), uptr(m_value));
336 if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
337 TURF_TRACE(ConcurrentMap_Grampa, 17, "[Mutator::exchangeValue] racing write inserted new value", uptr(m_table), uptr(m_value));
339 // There was a racing write (or erase) to this cell.
340 // Pretend we exchanged with ourselves, and just let the racing write win.
343 // We've encountered a Redirect value. Help finish the migration.
344 TURF_TRACE(ConcurrentMap_Grampa, 18, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
345 Hash hash = m_cell->hash.load(turf::Relaxed);
347 // Help complete the migration.
348 m_table->jobCoordinator.participate();
349 // Try again in the latest table.
350 // FIXME: locateTable() could return false if the map is concurrently cleared (m_root set to 0).
351 // This is not concern yet since clear() is not implemented.
352 bool exists = m_map.locateTable(m_table, m_sizeMask, hash);
355 m_value = Value(ValueTraits::NullValue);
357 switch (Details::insert(hash, m_table, m_sizeMask, m_cell, overflowIdx)) { // Modifies m_cell
358 case Details::InsertResult_AlreadyFound:
359 m_value = m_cell->value.load(turf::Consume);
360 if (m_value == Value(ValueTraits::Redirect)) {
361 TURF_TRACE(ConcurrentMap_Grampa, 19, "[Mutator::exchangeValue] was re-redirected", uptr(m_table), uptr(m_value));
365 case Details::InsertResult_InsertedNew:
367 case Details::InsertResult_Overflow:
368 TURF_TRACE(ConcurrentMap_Grampa, 20, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table), overflowIdx);
369 Details::beginTableMigration(m_map, m_table, overflowIdx);
372 // We were redirected... again
376 // Try again in the new table.
380 void setValue(Value desired) {
381 exchangeValue(desired);
385 TURF_ASSERT(m_cell); // Cell must have been found or inserted
386 TURF_TRACE(ConcurrentMap_Grampa, 21, "[Mutator::eraseValue] called", uptr(m_table), uptr(m_value));
388 if (m_value == Value(ValueTraits::NullValue))
390 TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
391 if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
392 // Exchange was successful and a non-NullValue value was erased and returned by reference in m_value.
393 TURF_ASSERT(m_value != Value(ValueTraits::NullValue)); // Implied by the test at the start of the loop.
394 Value result = m_value;
395 m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
398 // The CAS failed and m_value has been updated with the latest value.
399 TURF_TRACE(ConcurrentMap_Grampa, 22, "[Mutator::eraseValue] detected race to write value", uptr(m_table), uptr(m_value));
400 if (m_value != Value(ValueTraits::Redirect)) {
401 // There was a racing write (or erase) to this cell.
402 // Pretend we erased nothing, and just let the racing write win.
403 return Value(ValueTraits::NullValue);
405 // We've been redirected to a new table.
406 TURF_TRACE(ConcurrentMap_Grampa, 23, "[Mutator::eraseValue] was redirected", uptr(m_table), uptr(m_cell));
407 Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
409 // Help complete the migration.
410 m_table->jobCoordinator.participate();
411 // Try again in the latest table.
412 if (!m_map.locateTable(m_table, m_sizeMask, hash))
415 m_cell = Details::find(hash, m_table, m_sizeMask);
417 m_value = Value(ValueTraits::NullValue);
420 m_value = m_cell->value.load(turf::Relaxed);
421 if (m_value != Value(ValueTraits::Redirect))
423 TURF_TRACE(ConcurrentMap_Grampa, 24, "[Mutator::eraseValue] was re-redirected", uptr(m_table), uptr(m_cell));
429 Mutator insert(Key key) {
430 return Mutator(*this, key);
433 Mutator find(Key key) {
434 return Mutator(*this, key, false);
437 // Lookup without creating a temporary Mutator.
439 Hash hash = KeyTraits::hash(key);
440 TURF_TRACE(ConcurrentMap_Grampa, 25, "[get] called", uptr(this), uptr(hash));
442 typename Details::Table* table;
444 if (!locateTable(table, sizeMask, hash))
445 return Value(ValueTraits::NullValue);
446 typename Details::Cell* cell = Details::find(hash, table, sizeMask);
448 return Value(ValueTraits::NullValue);
449 Value value = cell->value.load(turf::Consume);
450 if (value != Value(ValueTraits::Redirect))
451 return value; // Found an existing value
452 // We've been redirected to a new table. Help with the migration.
453 TURF_TRACE(ConcurrentMap_Grampa, 26, "[get] was redirected", uptr(table), 0);
454 table->jobCoordinator.participate();
455 // Try again in the new table.
459 Value insert(Key key, Value desired) {
460 Mutator iter(*this, key);
461 return iter.exchangeValue(desired);
464 Value exchange(Key key, Value desired) {
465 Mutator iter(*this, key);
466 return iter.exchangeValue(desired);
469 Value erase(Key key) {
470 Mutator iter(*this, key, false);
471 return iter.eraseValue();
474 // The easiest way to implement an Iterator is to prevent all Redirects.
475 // The currrent Iterator does that by forbidding concurrent inserts.
476 // To make it work with concurrent inserts, we'd need a way to block TableMigrations as the Iterator visits each table.
477 // FlatTreeMigrations, too.
480 typename Details::FlatTree* m_flatTree;
482 typename Details::Table* m_table;
488 Iterator(ConcurrentMap_Grampa& map) {
489 // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
490 ureg root = map.m_root.load(turf::Consume);
492 m_flatTree = (typename Details::FlatTree*) (root & ~ureg(1));
493 TURF_ASSERT(m_flatTree->getSize() > 0);
495 m_table = m_flatTree->getTables()[0].load(turf::Consume);
496 TURF_ASSERT(m_table);
501 m_table = (typename Details::Table*) root;
507 m_hash = KeyTraits::NullHash;
508 m_value = Value(ValueTraits::NullValue);
513 TURF_ASSERT(m_table);
514 TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
518 if (m_idx <= m_table->sizeMask) {
519 // Index still inside range of table.
520 typename Details::CellGroup* group = m_table->getCellGroups() + (m_idx >> 2);
521 typename Details::Cell *cell = group->cells + (m_idx & 3);
522 m_hash = cell->hash.load(turf::Relaxed);
523 if (m_hash != KeyTraits::NullHash) {
524 // Cell has been reserved.
525 m_value = cell->value.load(turf::Relaxed);
526 TURF_ASSERT(m_value != Value(ValueTraits::Redirect));
527 if (m_value != Value(ValueTraits::NullValue))
528 return; // Yield this cell.
531 // We've advanced past the end of this table.
533 // Scan for the next unique table in the flattree.
534 while (++m_flatTreeIdx < m_flatTree->getSize()) {
535 typename Details::Table* nextTable = m_flatTree->getTables()[m_flatTreeIdx].load(turf::Consume);
536 if (nextTable != m_table) {
537 // Found the next table.
540 goto searchInTable; // Continue iterating in this table.
544 // That's the end of the entire map.
545 m_hash = KeyTraits::NullHash;
546 m_value = Value(ValueTraits::NullValue);
552 bool isValid() const {
553 return m_value != Value(ValueTraits::NullValue);
557 TURF_ASSERT(isValid());
558 // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
559 return KeyTraits::dehash(m_hash);
562 Value getValue() const {
563 TURF_ASSERT(isValid());
569 } // namespace junction
571 #endif // JUNCTION_CONCURRENTMAP_GRAMPA_H