From 8eda5c6b51581591d3c219f2b4b1b50811801280 Mon Sep 17 00:00:00 2001 From: Jordan DeLong Date: Sun, 3 Jun 2012 14:58:50 -0700 Subject: [PATCH] Fix some memory_order parameters in AtomicHashMap Summary: Herb Sutter noticed an error in the AHM find pseudo-code at the C++ conference, and the error is in the real implementation too. We should be using memory_order_acquire, because we need the load of the key to happen before any subsequent loads of the value itself. Our implementation is conservative for a memory_order_relaxed load (we generate the same code as if it were memory_order_acquire---i.e. a compiler barrier but no memory barriers), so this should not change the generated code. Correctness is a good idea, though (when we upgrade to gcc 4.7's atomics it might not be as conservative). Also, erase was doing three loads when one is enough. ... also fixes an exception safety bug in insert. (Phew.) Test Plan: Ran the AHM unit tests, debug and opt. Reviewed By: andrei.alexandrescu@fb.com FB internal diff: D485744 --- folly/AtomicHashArray-inl.h | 37 ++++++++++++++++++++++++------------- folly/AtomicHashArray.h | 6 +++++- 2 files changed, 29 insertions(+), 14 deletions(-) diff --git a/folly/AtomicHashArray-inl.h b/folly/AtomicHashArray-inl.h index 4d83f38b..5d1a5b1e 100644 --- a/folly/AtomicHashArray-inl.h +++ b/folly/AtomicHashArray-inl.h @@ -51,7 +51,7 @@ findInternal(const KeyT key_in) { for (size_t idx = keyToAnchorIdx(key_in), numProbes = 0; ; idx = probeNext(idx, numProbes)) { - const KeyT key = relaxedLoadKey(cells_[idx]); + const KeyT key = acquireLoadKey(cells_[idx]); if (LIKELY(key == key_in)) { return SimpleRetT(idx, true); } @@ -87,9 +87,10 @@ insertInternal(const value_type& record) { CHECK_NE(key_in, kEmptyKey_); CHECK_NE(key_in, kLockedKey_); CHECK_NE(key_in, kErasedKey_); - for (size_t idx = keyToAnchorIdx(key_in), numProbes = 0; - ; - idx = probeNext(idx, numProbes)) { + + size_t idx = keyToAnchorIdx(key_in); + size_t numProbes = 0; + for (;;) { DCHECK_LT(idx, capacity_); value_type* cell = &cells_[idx]; if (relaxedLoadKey(*cell) == kEmptyKey_) { @@ -133,6 +134,8 @@ insertInternal(const value_type& record) { new (&cell->second) ValueT(record.second); unlockCell(cell, key_in); // Sets the new key } catch (...) { + // Transition back to empty key---requires handling + // locked->empty below. unlockCell(cell, kEmptyKey_); --numPendingEntries_; throw; @@ -149,23 +152,31 @@ insertInternal(const value_type& record) { } } DCHECK(relaxedLoadKey(*cell) != kEmptyKey_); - if (kLockedKey_ == cellKeyPtr(*cell)->load(std::memory_order_acquire)) { + if (kLockedKey_ == acquireLoadKey(*cell)) { FOLLY_SPIN_WAIT( - kLockedKey_ == cellKeyPtr(*cell)->load(std::memory_order_acquire) + kLockedKey_ == acquireLoadKey(*cell) ); } - DCHECK(relaxedLoadKey(*cell) != kEmptyKey_); - DCHECK(relaxedLoadKey(*cell) != kLockedKey_); - if (key_in == relaxedLoadKey(*cell)) { + + const KeyT thisKey = acquireLoadKey(*cell); + if (thisKey == key_in) { // Found an existing entry for our key, but we don't overwrite the // previous value. return SimpleRetT(idx, false); + } else if (thisKey == kEmptyKey_ || thisKey == kLockedKey_) { + // We need to try again (i.e., don't increment numProbes or + // advance idx): this case can happen if the constructor for + // ValueT threw for this very cell (the rethrow block above). + continue; } + ++numProbes; if (UNLIKELY(numProbes >= capacity_)) { // probed every cell...fail return SimpleRetT(capacity_, false); } + + idx = probeNext(idx, numProbes); } } @@ -191,13 +202,13 @@ erase(KeyT key_in) { idx = probeNext(idx, numProbes)) { DCHECK_LT(idx, capacity_); value_type* cell = &cells_[idx]; - if (relaxedLoadKey(*cell) == kEmptyKey_ || - relaxedLoadKey(*cell) == kLockedKey_) { + KeyT currentKey = acquireLoadKey(*cell); + if (currentKey == kEmptyKey_ || currentKey == kLockedKey_) { // If we hit an empty (or locked) element, this key does not exist. This // is similar to how it's handled in find(). return 0; } - if (key_in == relaxedLoadKey(*cell)) { + if (key_in == currentKey) { // Found an existing entry for our key, attempt to mark it erased. // Some other thread may have erased our key, but this is ok. KeyT expect = key_in; @@ -350,7 +361,7 @@ struct AtomicHashArray::aha_iterator } bool isValid() const { - KeyT key = relaxedLoadKey(aha_->cells_[offset_]); + KeyT key = acquireLoadKey(aha_->cells_[offset_]); return key != aha_->kEmptyKey_ && key != aha_->kLockedKey_ && key != aha_->kErasedKey_; diff --git a/folly/AtomicHashArray.h b/folly/AtomicHashArray.h index 640605e4..e1bcc5d0 100644 --- a/folly/AtomicHashArray.h +++ b/folly/AtomicHashArray.h @@ -232,6 +232,10 @@ class AtomicHashArray : boost::noncopyable { return cellKeyPtr(r)->load(std::memory_order_relaxed); } + static KeyT acquireLoadKey(const value_type& r) { + return cellKeyPtr(r)->load(std::memory_order_acquire); + } + // Fun with thread local storage - atomic increment is expensive // (relatively), so we accumulate in the thread cache and periodically // flush to the actual variable, and walk through the unflushed counts when @@ -259,7 +263,7 @@ class AtomicHashArray : boost::noncopyable { inline bool tryLockCell(value_type* const cell) { KeyT expect = kEmptyKey_; return cellKeyPtr(*cell)->compare_exchange_strong(expect, kLockedKey_, - std::memory_order_acquire); + std::memory_order_acq_rel); } inline size_t keyToAnchorIdx(const KeyT k) const { -- 2.34.1