Summary: Herb Sutter noticed an error in the AHM find pseudo-code at
the C++ conference, and the error is in the real implementation too.
We should be using memory_order_acquire, because we need the load of
the key to happen before any subsequent loads of the value itself.
Our implementation is conservative for a memory_order_relaxed load
(we generate the same code as if it were memory_order_acquire---i.e. a
compiler barrier but no memory barriers), so this should not change the
generated code. Correctness is a good idea, though (when we upgrade
to gcc 4.7's atomics it might not be as conservative). Also, erase was
doing three loads when one is enough. ... also fixes an exception safety
bug in insert. (Phew.)
Test Plan: Ran the AHM unit tests, debug and opt.
Reviewed By: andrei.alexandrescu@fb.com
FB internal diff:
D485744
for (size_t idx = keyToAnchorIdx(key_in), numProbes = 0;
;
idx = probeNext(idx, numProbes)) {
for (size_t idx = keyToAnchorIdx(key_in), numProbes = 0;
;
idx = probeNext(idx, numProbes)) {
- const KeyT key = relaxedLoadKey(cells_[idx]);
+ const KeyT key = acquireLoadKey(cells_[idx]);
if (LIKELY(key == key_in)) {
return SimpleRetT(idx, true);
}
if (LIKELY(key == key_in)) {
return SimpleRetT(idx, true);
}
CHECK_NE(key_in, kEmptyKey_);
CHECK_NE(key_in, kLockedKey_);
CHECK_NE(key_in, kErasedKey_);
CHECK_NE(key_in, kEmptyKey_);
CHECK_NE(key_in, kLockedKey_);
CHECK_NE(key_in, kErasedKey_);
- for (size_t idx = keyToAnchorIdx(key_in), numProbes = 0;
- ;
- idx = probeNext(idx, numProbes)) {
+
+ size_t idx = keyToAnchorIdx(key_in);
+ size_t numProbes = 0;
+ for (;;) {
DCHECK_LT(idx, capacity_);
value_type* cell = &cells_[idx];
if (relaxedLoadKey(*cell) == kEmptyKey_) {
DCHECK_LT(idx, capacity_);
value_type* cell = &cells_[idx];
if (relaxedLoadKey(*cell) == kEmptyKey_) {
new (&cell->second) ValueT(record.second);
unlockCell(cell, key_in); // Sets the new key
} catch (...) {
new (&cell->second) ValueT(record.second);
unlockCell(cell, key_in); // Sets the new key
} catch (...) {
+ // Transition back to empty key---requires handling
+ // locked->empty below.
unlockCell(cell, kEmptyKey_);
--numPendingEntries_;
throw;
unlockCell(cell, kEmptyKey_);
--numPendingEntries_;
throw;
}
}
DCHECK(relaxedLoadKey(*cell) != kEmptyKey_);
}
}
DCHECK(relaxedLoadKey(*cell) != kEmptyKey_);
- if (kLockedKey_ == cellKeyPtr(*cell)->load(std::memory_order_acquire)) {
+ if (kLockedKey_ == acquireLoadKey(*cell)) {
- kLockedKey_ == cellKeyPtr(*cell)->load(std::memory_order_acquire)
+ kLockedKey_ == acquireLoadKey(*cell)
- DCHECK(relaxedLoadKey(*cell) != kEmptyKey_);
- DCHECK(relaxedLoadKey(*cell) != kLockedKey_);
- if (key_in == relaxedLoadKey(*cell)) {
+
+ const KeyT thisKey = acquireLoadKey(*cell);
+ if (thisKey == key_in) {
// Found an existing entry for our key, but we don't overwrite the
// previous value.
return SimpleRetT(idx, false);
// Found an existing entry for our key, but we don't overwrite the
// previous value.
return SimpleRetT(idx, false);
+ } else if (thisKey == kEmptyKey_ || thisKey == kLockedKey_) {
+ // We need to try again (i.e., don't increment numProbes or
+ // advance idx): this case can happen if the constructor for
+ // ValueT threw for this very cell (the rethrow block above).
+ continue;
++numProbes;
if (UNLIKELY(numProbes >= capacity_)) {
// probed every cell...fail
return SimpleRetT(capacity_, false);
}
++numProbes;
if (UNLIKELY(numProbes >= capacity_)) {
// probed every cell...fail
return SimpleRetT(capacity_, false);
}
+
+ idx = probeNext(idx, numProbes);
idx = probeNext(idx, numProbes)) {
DCHECK_LT(idx, capacity_);
value_type* cell = &cells_[idx];
idx = probeNext(idx, numProbes)) {
DCHECK_LT(idx, capacity_);
value_type* cell = &cells_[idx];
- if (relaxedLoadKey(*cell) == kEmptyKey_ ||
- relaxedLoadKey(*cell) == kLockedKey_) {
+ KeyT currentKey = acquireLoadKey(*cell);
+ if (currentKey == kEmptyKey_ || currentKey == kLockedKey_) {
// If we hit an empty (or locked) element, this key does not exist. This
// is similar to how it's handled in find().
return 0;
}
// If we hit an empty (or locked) element, this key does not exist. This
// is similar to how it's handled in find().
return 0;
}
- if (key_in == relaxedLoadKey(*cell)) {
+ if (key_in == currentKey) {
// Found an existing entry for our key, attempt to mark it erased.
// Some other thread may have erased our key, but this is ok.
KeyT expect = key_in;
// Found an existing entry for our key, attempt to mark it erased.
// Some other thread may have erased our key, but this is ok.
KeyT expect = key_in;
- KeyT key = relaxedLoadKey(aha_->cells_[offset_]);
+ KeyT key = acquireLoadKey(aha_->cells_[offset_]);
return key != aha_->kEmptyKey_ &&
key != aha_->kLockedKey_ &&
key != aha_->kErasedKey_;
return key != aha_->kEmptyKey_ &&
key != aha_->kLockedKey_ &&
key != aha_->kErasedKey_;
return cellKeyPtr(r)->load(std::memory_order_relaxed);
}
return cellKeyPtr(r)->load(std::memory_order_relaxed);
}
+ static KeyT acquireLoadKey(const value_type& r) {
+ return cellKeyPtr(r)->load(std::memory_order_acquire);
+ }
+
// Fun with thread local storage - atomic increment is expensive
// (relatively), so we accumulate in the thread cache and periodically
// flush to the actual variable, and walk through the unflushed counts when
// Fun with thread local storage - atomic increment is expensive
// (relatively), so we accumulate in the thread cache and periodically
// flush to the actual variable, and walk through the unflushed counts when
inline bool tryLockCell(value_type* const cell) {
KeyT expect = kEmptyKey_;
return cellKeyPtr(*cell)->compare_exchange_strong(expect, kLockedKey_,
inline bool tryLockCell(value_type* const cell) {
KeyT expect = kEmptyKey_;
return cellKeyPtr(*cell)->compare_exchange_strong(expect, kLockedKey_,
- std::memory_order_acquire);
+ std::memory_order_acq_rel);
}
inline size_t keyToAnchorIdx(const KeyT k) const {
}
inline size_t keyToAnchorIdx(const KeyT k) const {