1 /*------------------------------------------------------------------------
2 Junction: Concurrent data structures in C++
3 Copyright (c) 2016 Jeff Preshing
5 Distributed under the Simplified BSD License.
6 Original location: https://github.com/preshing/junction
8 This software is distributed WITHOUT ANY WARRANTY; without even the
9 implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10 See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
13 #ifndef JUNCTION_CONCURRENTMAP_LINEAR_H
14 #define JUNCTION_CONCURRENTMAP_LINEAR_H
16 #include <junction/Core.h>
17 #include <junction/details/Linear.h>
18 #include <junction/QSBR.h>
19 #include <turf/Heap.h>
20 #include <turf/Trace.h>
24 TURF_TRACE_DECLARE(ConcurrentMap_Linear, 18)
26 template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V>>
27 class ConcurrentMap_Linear {
32 typedef VT ValueTraits;
33 typedef typename turf::util::BestFit<Key>::Unsigned Hash;
34 typedef details::Linear<ConcurrentMap_Linear> Details;
37 turf::Atomic<typename Details::Table*> m_root;
40 ConcurrentMap_Linear(ureg capacity) {
41 ureg limitNumValues = capacity * 3 / 4;
42 m_root.storeNonatomic(Details::Table::create(capacity, limitNumValues));
45 ~ConcurrentMap_Linear() {
46 typename Details::Table* table = m_root.loadNonatomic();
50 // publishTableMigration() is called by exactly one thread from Details::TableMigration::run()
51 // after all the threads participating in the migration have completed their work.
52 void publishTableMigration(typename Details::TableMigration* migration) {
53 // There are no racing calls to this function.
54 typename Details::Table* oldRoot = m_root.loadNonatomic();
55 m_root.store(migration->m_destination, turf::Release);
56 TURF_ASSERT(oldRoot == migration->m_source);
57 // Caller will GC the TableMigration and the source table.
60 // A Mutator represents a known cell in the hash table.
61 // It's meant for manipulations within a temporary function scope.
62 // Obviously you must not call QSBR::Update while holding a Mutator.
63 // Any operation that modifies the table (exchangeValue, eraseValue)
64 // may be forced to follow a redirected cell, which changes the Mutator itself.
65 // Note that even if the Mutator was constructed from an existing cell,
66 // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted,
67 // or if another thread deletes the key between the two steps.
70 friend class ConcurrentMap_Linear;
72 ConcurrentMap_Linear& m_map;
73 typename Details::Table* m_table;
74 typename Details::Cell* m_cell;
77 // Constructor: Find existing cell
78 Mutator(ConcurrentMap_Linear& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
79 TURF_TRACE(ConcurrentMap_Linear, 0, "[Mutator] find constructor called", uptr(m_table), uptr(key));
80 Hash hash = KeyTraits::hash(key);
82 m_table = m_map.m_root.load(turf::Consume);
83 m_cell = Details::find(hash, m_table);
86 m_value = m_cell->value.load(turf::Consume);
87 if (m_value != Value(ValueTraits::Redirect))
88 return; // Found an existing value
89 // We've encountered a Redirect value. Help finish the migration.
90 TURF_TRACE(ConcurrentMap_Linear, 1, "[Mutator] find was redirected", uptr(m_table), uptr(0));
91 m_table->jobCoordinator.participate();
92 // Try again using the latest root.
96 // Constructor: Insert cell
97 Mutator(ConcurrentMap_Linear& map, Key key)
98 : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) {
99 TURF_TRACE(ConcurrentMap_Linear, 2, "[Mutator] insert constructor called", uptr(m_table), uptr(key));
100 Hash hash = KeyTraits::hash(key);
102 m_table = m_map.m_root.load(turf::Consume);
103 switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell
104 case Details::InsertResult_InsertedNew: {
105 // We've inserted a new cell. Don't load m_cell->value.
108 case Details::InsertResult_AlreadyFound: {
109 // The hash was already found in the table.
110 m_value = m_cell->value.load(turf::Consume);
111 if (m_value == Value(ValueTraits::Redirect)) {
112 // We've encountered a Redirect value.
113 TURF_TRACE(ConcurrentMap_Linear, 3, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value));
114 break; // Help finish the migration.
116 return; // Found an existing value
118 case Details::InsertResult_Overflow: {
119 Details::beginTableMigration(m_map, m_table);
123 // A migration has been started (either by us, or another thread). Participate until it's complete.
124 m_table->jobCoordinator.participate();
125 // Try again using the latest root.
130 Value getValue() const {
131 // Return previously loaded value. Don't load it again.
132 return Value(m_value);
135 Value exchangeValue(Value desired) {
136 TURF_ASSERT(desired != Value(ValueTraits::NullValue));
137 TURF_ASSERT(m_cell); // Cell must have been found or inserted
138 TURF_TRACE(ConcurrentMap_Linear, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
140 Value oldValue = m_value;
141 s32 prevRemainingValues = s32(-1);
142 if (oldValue == Value(ValueTraits::NullValue)) {
143 // It's a deleted or newly initialized cell.
144 // Decrement remainingValues to ensure we have permission to (re)insert a value.
145 prevRemainingValues = m_table->valuesRemaining.fetchSub(1, turf::Relaxed);
146 if (prevRemainingValues <= 0) {
147 TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] ran out of valuesRemaining", uptr(m_table),
148 prevRemainingValues);
149 // Can't (re)insert any more values.
150 // There are two ways this can happen. One with a TableMigration already in progress, and one without:
151 // 1. A TableMigration puts a cap on the number of values late-arriving threads are allowed to insert.
152 // 2. Two threads race to insert the same key, and it's the last free cell in the table.
153 // (Note: We could get tid of the valuesRemaining counter by handling the possibility of migration
155 // as LeapFrog and Grampa do...)
156 m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
157 // Since we don't know whether there's already a TableMigration in progress, always attempt to start one
159 Details::beginTableMigration(m_map, m_table);
163 if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
164 // Exchange was successful. Return previous value.
165 TURF_TRACE(ConcurrentMap_Linear, 6, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired));
166 Value result = m_value;
167 m_value = desired; // Leave the mutator in a valid state
170 // The CAS failed and m_value has been updated with the latest value.
171 if (m_value != Value(ValueTraits::Redirect)) {
172 TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
174 if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
175 TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] racing write inserted new value",
176 uptr(m_table), uptr(m_value));
177 m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
179 // There was a racing write (or erase) to this cell.
180 // Pretend we exchanged with ourselves, and just let the racing write win.
183 // We've encountered a Redirect value. Help finish the migration.
184 TURF_TRACE(ConcurrentMap_Linear, 9, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
186 // Help migrate to new table.
187 Hash hash = m_cell->hash.load(turf::Relaxed);
189 // Help complete the migration.
190 m_table->jobCoordinator.participate();
191 // Try again in the new table.
192 m_table = m_map.m_root.load(turf::Consume);
193 m_value = Value(ValueTraits::NullValue);
194 switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell
195 case Details::InsertResult_AlreadyFound:
196 m_value = m_cell->value.load(turf::Consume);
197 if (m_value == Value(ValueTraits::Redirect)) {
198 TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
203 case Details::InsertResult_InsertedNew:
205 case Details::InsertResult_Overflow:
206 TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
208 Details::beginTableMigration(m_map, m_table);
211 // We were redirected... again
214 // Try again in the new table.
218 void setValue(Value desired) {
219 exchangeValue(desired);
223 TURF_ASSERT(m_cell); // Cell must have been found or inserted
224 TURF_TRACE(ConcurrentMap_Linear, 12, "[Mutator::eraseValue] called", uptr(m_table), m_cell - m_table->getCells());
226 if (m_value == Value(ValueTraits::NullValue))
227 return Value(m_value);
228 TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
229 if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
230 // Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
231 TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
232 m_table->valuesRemaining.fetchAdd(1, turf::Relaxed);
233 Value result = m_value;
234 m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
237 // The CAS failed and m_value has been updated with the latest value.
238 TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
239 m_cell - m_table->getCells());
240 if (m_value != Value(ValueTraits::Redirect)) {
241 // There was a racing write (or erase) to this cell.
242 // Pretend we erased nothing, and just let the racing write win.
243 return Value(ValueTraits::NullValue);
245 // We've been redirected to a new table.
246 TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was redirected", uptr(m_table),
247 m_cell - m_table->getCells());
248 Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
250 // Help complete the migration.
251 m_table->jobCoordinator.participate();
252 // Try again in the new table.
253 m_table = m_map.m_root.load(turf::Consume);
254 m_cell = Details::find(hash, m_table);
256 m_value = Value(ValueTraits::NullValue);
259 m_value = m_cell->value.load(turf::Relaxed);
260 if (m_value != Value(ValueTraits::Redirect))
262 TURF_TRACE(ConcurrentMap_Linear, 15, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
263 m_cell - m_table->getCells());
269 Mutator insert(Key key) {
270 return Mutator(*this, key);
273 Mutator find(Key key) {
274 return Mutator(*this, key, false);
277 // Lookup without creating a temporary Mutator.
279 Hash hash = KeyTraits::hash(key);
280 TURF_TRACE(ConcurrentMap_Linear, 16, "[get] called", uptr(this), uptr(hash));
282 typename Details::Table* table = m_root.load(turf::Consume);
283 typename Details::Cell* cell = Details::find(hash, table);
285 return Value(ValueTraits::NullValue);
286 Value value = cell->value.load(turf::Consume);
287 if (value != Value(ValueTraits::Redirect))
288 return value; // Found an existing value
289 // We've been redirected to a new table. Help with the migration.
290 TURF_TRACE(ConcurrentMap_Linear, 17, "[get] was redirected", uptr(table), uptr(cell));
291 table->jobCoordinator.participate();
292 // Try again in the new table.
296 Value insert(Key key, Value desired) {
297 Mutator iter(*this, key);
298 return iter.exchangeValue(desired);
301 Value exchange(Key key, Value desired) {
302 Mutator iter(*this, key);
303 return iter.exchangeValue(desired);
306 Value erase(Key key) {
307 Mutator iter(*this, key, false);
308 return iter.eraseValue();
311 // The easiest way to implement an Iterator is to prevent all Redirects.
312 // The currrent Iterator does that by forbidding concurrent inserts.
313 // To make it work with concurrent inserts, we'd need a way to block TableMigrations.
316 typename Details::Table* m_table;
322 Iterator(ConcurrentMap_Linear& map) {
323 // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
324 m_table = map.m_root.load(turf::Consume);
330 TURF_ASSERT(m_table);
331 TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
332 while (++m_idx <= m_table->sizeMask) {
333 // Index still inside range of table.
334 typename Details::Cell* cell = m_table->getCells() + m_idx;
335 m_hash = cell->hash.load(turf::Relaxed);
336 if (m_hash != KeyTraits::NullHash) {
337 // Cell has been reserved.
338 m_value = cell->value.load(turf::Relaxed);
339 TURF_ASSERT(m_value != Value(ValueTraits::Redirect));
340 if (m_value != Value(ValueTraits::NullValue))
341 return; // Yield this cell.
344 // That's the end of the map.
345 m_hash = KeyTraits::NullHash;
346 m_value = ValueTraits::NullValue;
349 bool isValid() const {
350 return m_value != Value(ValueTraits::NullValue);
354 TURF_ASSERT(isValid());
355 // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
356 return KeyTraits::dehash(m_hash);
359 Value getValue() const {
360 TURF_ASSERT(isValid());
366 } // namespace junction
368 #endif // JUNCTION_CONCURRENTMAP_LINEAR_H