6d0fa986abf281166a1de3541d429bcbe7062fcd
[junction.git] / junction / ConcurrentMap_Linear.h
1 /*------------------------------------------------------------------------
2   Junction: Concurrent data structures in C++
3   Copyright (c) 2016 Jeff Preshing
4
5   Distributed under the Simplified BSD License.
6   Original location: https://github.com/preshing/junction
7
8   This software is distributed WITHOUT ANY WARRANTY; without even the
9   implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.
10   See the LICENSE file for more information.
11 ------------------------------------------------------------------------*/
12
13 #ifndef JUNCTION_CONCURRENTMAP_LINEAR_H
14 #define JUNCTION_CONCURRENTMAP_LINEAR_H
15
16 #include <junction/Core.h>
17 #include <junction/details/Linear.h>
18 #include <junction/QSBR.h>
19 #include <turf/Heap.h>
20 #include <turf/Trace.h>
21
22 namespace junction {
23
24 TURF_TRACE_DECLARE(ConcurrentMap_Linear, 18)
25
26 template <typename K, typename V, class KT = DefaultKeyTraits<K>, class VT = DefaultValueTraits<V>>
27 class ConcurrentMap_Linear {
28 public:
29     typedef K Key;
30     typedef V Value;
31     typedef KT KeyTraits;
32     typedef VT ValueTraits;
33     typedef typename turf::util::BestFit<Key>::Unsigned Hash;
34     typedef details::Linear<ConcurrentMap_Linear> Details;
35
36 private:
37     turf::Atomic<typename Details::Table*> m_root;
38
39 public:
40     ConcurrentMap_Linear(ureg capacity) {
41         ureg limitNumValues = capacity * 3 / 4;
42         m_root.storeNonatomic(Details::Table::create(capacity, limitNumValues));
43     }
44
45     ~ConcurrentMap_Linear() {
46         typename Details::Table* table = m_root.loadNonatomic();
47         table->destroy();
48     }
49
50     // publishTableMigration() is called by exactly one thread from Details::TableMigration::run()
51     // after all the threads participating in the migration have completed their work.
52     void publishTableMigration(typename Details::TableMigration* migration) {
53         // There are no racing calls to this function.
54         typename Details::Table* oldRoot = m_root.loadNonatomic();
55         m_root.store(migration->m_destination, turf::Release);
56         TURF_ASSERT(oldRoot == migration->m_source);
57         // Caller will GC the TableMigration and the source table.
58     }
59
60     // A Mutator represents a known cell in the hash table.
61     // It's meant for manipulations within a temporary function scope.
62     // Obviously you must not call QSBR::Update while holding a Mutator.
63     // Any operation that modifies the table (exchangeValue, eraseValue)
64     // may be forced to follow a redirected cell, which changes the Mutator itself.
65     // Note that even if the Mutator was constructed from an existing cell,
66     // exchangeValue() can still trigger a resize if the existing cell was previously marked deleted,
67     // or if another thread deletes the key between the two steps.
68     class Mutator {
69     private:
70         friend class ConcurrentMap_Linear;
71
72         ConcurrentMap_Linear& m_map;
73         typename Details::Table* m_table;
74         typename Details::Cell* m_cell;
75         Value m_value;
76
77         // Constructor: Find existing cell
78         Mutator(ConcurrentMap_Linear& map, Key key, bool) : m_map(map), m_value(Value(ValueTraits::NullValue)) {
79             TURF_TRACE(ConcurrentMap_Linear, 0, "[Mutator] find constructor called", uptr(m_table), uptr(key));
80             Hash hash = KeyTraits::hash(key);
81             for (;;) {
82                 m_table = m_map.m_root.load(turf::Consume);
83                 m_cell = Details::find(hash, m_table);
84                 if (!m_cell)
85                     return;
86                 m_value = m_cell->value.load(turf::Consume);
87                 if (m_value != Value(ValueTraits::Redirect))
88                     return; // Found an existing value
89                 // We've encountered a Redirect value. Help finish the migration.
90                 TURF_TRACE(ConcurrentMap_Linear, 1, "[Mutator] find was redirected", uptr(m_table), uptr(0));
91                 m_table->jobCoordinator.participate();
92                 // Try again using the latest root.
93             }
94         }
95
96         // Constructor: Insert cell
97         Mutator(ConcurrentMap_Linear& map, Key key)
98             : m_map(map), m_table(map.m_root.load(turf::Consume)), m_value(Value(ValueTraits::NullValue)) {
99             TURF_TRACE(ConcurrentMap_Linear, 2, "[Mutator] insert constructor called", uptr(m_table), uptr(key));
100             Hash hash = KeyTraits::hash(key);
101             for (;;) {
102                 m_table = m_map.m_root.load(turf::Consume);
103                 switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell
104                 case Details::InsertResult_InsertedNew: {
105                     // We've inserted a new cell. Don't load m_cell->value.
106                     return;
107                 }
108                 case Details::InsertResult_AlreadyFound: {
109                     // The hash was already found in the table.
110                     m_value = m_cell->value.load(turf::Consume);
111                     if (m_value == Value(ValueTraits::Redirect)) {
112                         // We've encountered a Redirect value.
113                         TURF_TRACE(ConcurrentMap_Linear, 3, "[Mutator] insert was redirected", uptr(m_table), uptr(m_value));
114                         break; // Help finish the migration.
115                     }
116                     return; // Found an existing value
117                 }
118                 case Details::InsertResult_Overflow: {
119                     Details::beginTableMigration(m_map, m_table);
120                     break;
121                 }
122                 }
123                 // A migration has been started (either by us, or another thread). Participate until it's complete.
124                 m_table->jobCoordinator.participate();
125                 // Try again using the latest root.
126             }
127         }
128
129     public:
130         Value getValue() const {
131             // Return previously loaded value. Don't load it again.
132             return Value(m_value);
133         }
134
135         Value exchangeValue(Value desired) {
136             TURF_ASSERT(desired != Value(ValueTraits::NullValue));
137             TURF_ASSERT(m_cell); // Cell must have been found or inserted
138             TURF_TRACE(ConcurrentMap_Linear, 4, "[Mutator::exchangeValue] called", uptr(m_table), uptr(m_value));
139             for (;;) {
140                 Value oldValue = m_value;
141                 s32 prevRemainingValues = s32(-1);
142                 if (oldValue == Value(ValueTraits::NullValue)) {
143                     // It's a deleted or newly initialized cell.
144                     // Decrement remainingValues to ensure we have permission to (re)insert a value.
145                     prevRemainingValues = m_table->valuesRemaining.fetchSub(1, turf::Relaxed);
146                     if (prevRemainingValues <= 0) {
147                         TURF_TRACE(ConcurrentMap_Linear, 5, "[Mutator::exchangeValue] ran out of valuesRemaining", uptr(m_table),
148                                    prevRemainingValues);
149                         // Can't (re)insert any more values.
150                         // There are two ways this can happen. One with a TableMigration already in progress, and one without:
151                         // 1. A TableMigration puts a cap on the number of values late-arriving threads are allowed to insert.
152                         // 2. Two threads race to insert the same key, and it's the last free cell in the table.
153                         // (Note: We could get tid of the valuesRemaining counter by handling the possibility of migration
154                         // failure,
155                         // as LeapFrog and Grampa do...)
156                         m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
157                         // Since we don't know whether there's already a TableMigration in progress, always attempt to start one
158                         // here:
159                         Details::beginTableMigration(m_map, m_table);
160                         goto helpMigrate;
161                     }
162                 }
163                 if (m_cell->value.compareExchangeStrong(m_value, desired, turf::ConsumeRelease)) {
164                     // Exchange was successful. Return previous value.
165                     TURF_TRACE(ConcurrentMap_Linear, 6, "[Mutator::exchangeValue] exchanged Value", uptr(m_value), uptr(desired));
166                     Value result = m_value;
167                     m_value = desired; // Leave the mutator in a valid state
168                     return result;
169                 }
170                 // The CAS failed and m_value has been updated with the latest value.
171                 if (m_value != Value(ValueTraits::Redirect)) {
172                     TURF_TRACE(ConcurrentMap_Linear, 7, "[Mutator::exchangeValue] detected race to write value", uptr(m_table),
173                                uptr(m_value));
174                     if (oldValue == Value(ValueTraits::NullValue) && m_value != Value(ValueTraits::NullValue)) {
175                         TURF_TRACE(ConcurrentMap_Linear, 8, "[Mutator::exchangeValue] racing write inserted new value",
176                                    uptr(m_table), uptr(m_value));
177                         m_table->valuesRemaining.fetchAdd(1, turf::Relaxed); // Undo valuesRemaining decrement
178                     }
179                     // There was a racing write (or erase) to this cell.
180                     // Pretend we exchanged with ourselves, and just let the racing write win.
181                     return desired;
182                 }
183                 // We've encountered a Redirect value. Help finish the migration.
184                 TURF_TRACE(ConcurrentMap_Linear, 9, "[Mutator::exchangeValue] was redirected", uptr(m_table), uptr(m_value));
185             helpMigrate:
186                 // Help migrate to new table.
187                 Hash hash = m_cell->hash.load(turf::Relaxed);
188                 for (;;) {
189                     // Help complete the migration.
190                     m_table->jobCoordinator.participate();
191                     // Try again in the new table.
192                     m_table = m_map.m_root.load(turf::Consume);
193                     m_value = Value(ValueTraits::NullValue);
194                     switch (Details::insert(hash, m_table, m_cell)) { // Modifies m_cell
195                     case Details::InsertResult_AlreadyFound:
196                         m_value = m_cell->value.load(turf::Consume);
197                         if (m_value == Value(ValueTraits::Redirect)) {
198                             TURF_TRACE(ConcurrentMap_Linear, 10, "[Mutator::exchangeValue] was re-redirected", uptr(m_table),
199                                        uptr(m_value));
200                             break;
201                         }
202                         goto breakOuter;
203                     case Details::InsertResult_InsertedNew:
204                         goto breakOuter;
205                     case Details::InsertResult_Overflow:
206                         TURF_TRACE(ConcurrentMap_Linear, 11, "[Mutator::exchangeValue] overflow after redirect", uptr(m_table),
207                                    0);
208                         Details::beginTableMigration(m_map, m_table);
209                         break;
210                     }
211                     // We were redirected... again
212                 }
213             breakOuter:;
214                 // Try again in the new table.
215             }
216         }
217
218         void setValue(Value desired) {
219             exchangeValue(desired);
220         }
221
222         Value eraseValue() {
223             TURF_ASSERT(m_cell); // Cell must have been found or inserted
224             TURF_TRACE(ConcurrentMap_Linear, 12, "[Mutator::eraseValue] called", uptr(m_table), m_cell - m_table->getCells());
225             for (;;) {
226                 if (m_value == Value(ValueTraits::NullValue))
227                     return Value(m_value);
228                 TURF_ASSERT(m_cell); // m_value is non-NullValue, therefore cell must have been found or inserted.
229                 if (m_cell->value.compareExchangeStrong(m_value, Value(ValueTraits::NullValue), turf::Consume)) {
230                     // Exchange was successful and a non-NULL value was erased and returned by reference in m_value.
231                     TURF_ASSERT(m_value != ValueTraits::NullValue); // Implied by the test at the start of the loop.
232                     m_table->valuesRemaining.fetchAdd(1, turf::Relaxed);
233                     Value result = m_value;
234                     m_value = Value(ValueTraits::NullValue); // Leave the mutator in a valid state
235                     return result;
236                 }
237                 // The CAS failed and m_value has been updated with the latest value.
238                 TURF_TRACE(ConcurrentMap_Linear, 13, "[Mutator::eraseValue] detected race to write value", uptr(m_table),
239                            m_cell - m_table->getCells());
240                 if (m_value != Value(ValueTraits::Redirect)) {
241                     // There was a racing write (or erase) to this cell.
242                     // Pretend we erased nothing, and just let the racing write win.
243                     return Value(ValueTraits::NullValue);
244                 }
245                 // We've been redirected to a new table.
246                 TURF_TRACE(ConcurrentMap_Linear, 14, "[Mutator::eraseValue] was redirected", uptr(m_table),
247                            m_cell - m_table->getCells());
248                 Hash hash = m_cell->hash.load(turf::Relaxed); // Re-fetch hash
249                 for (;;) {
250                     // Help complete the migration.
251                     m_table->jobCoordinator.participate();
252                     // Try again in the new table.
253                     m_table = m_map.m_root.load(turf::Consume);
254                     m_cell = Details::find(hash, m_table);
255                     if (!m_cell) {
256                         m_value = Value(ValueTraits::NullValue);
257                         return m_value;
258                     }
259                     m_value = m_cell->value.load(turf::Relaxed);
260                     if (m_value != Value(ValueTraits::Redirect))
261                         break;
262                     TURF_TRACE(ConcurrentMap_Linear, 15, "[Mutator::eraseValue] was re-redirected", uptr(m_table),
263                                m_cell - m_table->getCells());
264                 }
265             }
266         }
267     };
268
269     Mutator insert(Key key) {
270         return Mutator(*this, key);
271     }
272
273     Mutator find(Key key) {
274         return Mutator(*this, key, false);
275     }
276
277     // Lookup without creating a temporary Mutator.
278     Value get(Key key) {
279         Hash hash = KeyTraits::hash(key);
280         TURF_TRACE(ConcurrentMap_Linear, 16, "[get] called", uptr(this), uptr(hash));
281         for (;;) {
282             typename Details::Table* table = m_root.load(turf::Consume);
283             typename Details::Cell* cell = Details::find(hash, table);
284             if (!cell)
285                 return Value(ValueTraits::NullValue);
286             Value value = cell->value.load(turf::Consume);
287             if (value != Value(ValueTraits::Redirect))
288                 return value; // Found an existing value
289             // We've been redirected to a new table. Help with the migration.
290             TURF_TRACE(ConcurrentMap_Linear, 17, "[get] was redirected", uptr(table), uptr(cell));
291             table->jobCoordinator.participate();
292             // Try again in the new table.
293         }
294     }
295
296     Value insert(Key key, Value desired) {
297         Mutator iter(*this, key);
298         return iter.exchangeValue(desired);
299     }
300
301     Value exchange(Key key, Value desired) {
302         Mutator iter(*this, key);
303         return iter.exchangeValue(desired);
304     }
305
306     Value erase(Key key) {
307         Mutator iter(*this, key, false);
308         return iter.eraseValue();
309     }
310
311     // The easiest way to implement an Iterator is to prevent all Redirects.
312     // The currrent Iterator does that by forbidding concurrent inserts.
313     // To make it work with concurrent inserts, we'd need a way to block TableMigrations.
314     class Iterator {
315     private:
316         typename Details::Table* m_table;
317         ureg m_idx;
318         Key m_hash;
319         Value m_value;
320
321     public:
322         Iterator(ConcurrentMap_Linear& map) {
323             // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
324             m_table = map.m_root.load(turf::Consume);
325             m_idx = -1;
326             next();
327         }
328
329         void next() {
330             TURF_ASSERT(m_table);
331             TURF_ASSERT(isValid() || m_idx == -1); // Either the Iterator is already valid, or we've just started iterating.
332             while (++m_idx <= m_table->sizeMask) {
333                 // Index still inside range of table.
334                 typename Details::Cell* cell = m_table->getCells() + m_idx;
335                 m_hash = cell->hash.load(turf::Relaxed);
336                 if (m_hash != KeyTraits::NullHash) {
337                     // Cell has been reserved.
338                     m_value = cell->value.load(turf::Relaxed);
339                     TURF_ASSERT(m_value != Value(ValueTraits::Redirect));
340                     if (m_value != Value(ValueTraits::NullValue))
341                         return; // Yield this cell.
342                 }
343             }
344             // That's the end of the map.
345             m_hash = KeyTraits::NullHash;
346             m_value = ValueTraits::NullValue;
347         }
348
349         bool isValid() const {
350             return m_value != Value(ValueTraits::NullValue);
351         }
352
353         Key getKey() const {
354             TURF_ASSERT(isValid());
355             // Since we've forbidden concurrent inserts (for now), nonatomic would suffice here, but let's plan ahead:
356             return KeyTraits::dehash(m_hash);
357         }
358
359         Value getValue() const {
360             TURF_ASSERT(isValid());
361             return m_value;
362         }
363     };
364 };
365
366 } // namespace junction
367
368 #endif // JUNCTION_CONCURRENTMAP_LINEAR_H