Fix some memory_order parameters in AtomicHashMap
authorJordan DeLong <jdelong@fb.com>
Sun, 3 Jun 2012 21:58:50 +0000 (14:58 -0700)
committerAndrew Gallagher <agallagher@fb.com>
Tue, 5 Jun 2012 23:21:05 +0000 (16:21 -0700)
Summary: Herb Sutter noticed an error in the AHM find pseudo-code at
the C++ conference, and the error is in the real implementation too.
We should be using memory_order_acquire, because we need the load of
the key to happen before any subsequent loads of the value itself.
Our implementation is conservative for a memory_order_relaxed load
(we generate the same code as if it were memory_order_acquire---i.e. a
compiler barrier but no memory barriers), so this should not change the
generated code.  Correctness is a good idea, though (when we upgrade
to gcc 4.7's atomics it might not be as conservative).  Also, erase was
doing three loads when one is enough.  ... also fixes an exception safety
bug in insert.  (Phew.)

Test Plan: Ran the AHM unit tests, debug and opt.

Reviewed By: andrei.alexandrescu@fb.com

FB internal diff: D485744

folly/AtomicHashArray-inl.h
folly/AtomicHashArray.h

index 4d83f38be110f630abf3095462705b8fea405920..5d1a5b1e4fc5b56591975a78c0bcca38c9c5bbf7 100644 (file)
@@ -51,7 +51,7 @@ findInternal(const KeyT key_in) {
   for (size_t idx = keyToAnchorIdx(key_in), numProbes = 0;
        ;
        idx = probeNext(idx, numProbes)) {
-    const KeyT key = relaxedLoadKey(cells_[idx]);
+    const KeyT key = acquireLoadKey(cells_[idx]);
     if (LIKELY(key == key_in)) {
       return SimpleRetT(idx, true);
     }
@@ -87,9 +87,10 @@ insertInternal(const value_type& record) {
   CHECK_NE(key_in, kEmptyKey_);
   CHECK_NE(key_in, kLockedKey_);
   CHECK_NE(key_in, kErasedKey_);
-  for (size_t idx = keyToAnchorIdx(key_in), numProbes = 0;
-       ;
-       idx = probeNext(idx, numProbes)) {
+
+  size_t idx = keyToAnchorIdx(key_in);
+  size_t numProbes = 0;
+  for (;;) {
     DCHECK_LT(idx, capacity_);
     value_type* cell = &cells_[idx];
     if (relaxedLoadKey(*cell) == kEmptyKey_) {
@@ -133,6 +134,8 @@ insertInternal(const value_type& record) {
             new (&cell->second) ValueT(record.second);
             unlockCell(cell, key_in); // Sets the new key
           } catch (...) {
+            // Transition back to empty key---requires handling
+            // locked->empty below.
             unlockCell(cell, kEmptyKey_);
             --numPendingEntries_;
             throw;
@@ -149,23 +152,31 @@ insertInternal(const value_type& record) {
       }
     }
     DCHECK(relaxedLoadKey(*cell) != kEmptyKey_);
-    if (kLockedKey_ == cellKeyPtr(*cell)->load(std::memory_order_acquire)) {
+    if (kLockedKey_ == acquireLoadKey(*cell)) {
       FOLLY_SPIN_WAIT(
-        kLockedKey_ == cellKeyPtr(*cell)->load(std::memory_order_acquire)
+        kLockedKey_ == acquireLoadKey(*cell)
       );
     }
-    DCHECK(relaxedLoadKey(*cell) != kEmptyKey_);
-    DCHECK(relaxedLoadKey(*cell) != kLockedKey_);
-    if (key_in == relaxedLoadKey(*cell)) {
+
+    const KeyT thisKey = acquireLoadKey(*cell);
+    if (thisKey == key_in) {
       // Found an existing entry for our key, but we don't overwrite the
       // previous value.
       return SimpleRetT(idx, false);
+    } else if (thisKey == kEmptyKey_ || thisKey == kLockedKey_) {
+      // We need to try again (i.e., don't increment numProbes or
+      // advance idx): this case can happen if the constructor for
+      // ValueT threw for this very cell (the rethrow block above).
+      continue;
     }
+
     ++numProbes;
     if (UNLIKELY(numProbes >= capacity_)) {
       // probed every cell...fail
       return SimpleRetT(capacity_, false);
     }
+
+    idx = probeNext(idx, numProbes);
   }
 }
 
@@ -191,13 +202,13 @@ erase(KeyT key_in) {
        idx = probeNext(idx, numProbes)) {
     DCHECK_LT(idx, capacity_);
     value_type* cell = &cells_[idx];
-    if (relaxedLoadKey(*cell) == kEmptyKey_ ||
-        relaxedLoadKey(*cell) == kLockedKey_) {
+    KeyT currentKey = acquireLoadKey(*cell);
+    if (currentKey == kEmptyKey_ || currentKey == kLockedKey_) {
       // If we hit an empty (or locked) element, this key does not exist. This
       // is similar to how it's handled in find().
       return 0;
     }
-    if (key_in == relaxedLoadKey(*cell)) {
+    if (key_in == currentKey) {
       // Found an existing entry for our key, attempt to mark it erased.
       // Some other thread may have erased our key, but this is ok.
       KeyT expect = key_in;
@@ -350,7 +361,7 @@ struct AtomicHashArray<KeyT, ValueT, HashFcn>::aha_iterator
   }
 
   bool isValid() const {
-    KeyT key = relaxedLoadKey(aha_->cells_[offset_]);
+    KeyT key = acquireLoadKey(aha_->cells_[offset_]);
     return key != aha_->kEmptyKey_  &&
       key != aha_->kLockedKey_ &&
       key != aha_->kErasedKey_;
index 640605e49906265a7cf1d7ad82c9c0ededca38d6..e1bcc5d026a70c24af176de340a6adb3b45b9a0e 100644 (file)
@@ -232,6 +232,10 @@ class AtomicHashArray : boost::noncopyable {
     return cellKeyPtr(r)->load(std::memory_order_relaxed);
   }
 
+  static KeyT acquireLoadKey(const value_type& r) {
+    return cellKeyPtr(r)->load(std::memory_order_acquire);
+  }
+
   // Fun with thread local storage - atomic increment is expensive
   // (relatively), so we accumulate in the thread cache and periodically
   // flush to the actual variable, and walk through the unflushed counts when
@@ -259,7 +263,7 @@ class AtomicHashArray : boost::noncopyable {
   inline bool tryLockCell(value_type* const cell) {
     KeyT expect = kEmptyKey_;
     return cellKeyPtr(*cell)->compare_exchange_strong(expect, kLockedKey_,
-      std::memory_order_acquire);
+      std::memory_order_acq_rel);
   }
 
   inline size_t keyToAnchorIdx(const KeyT k) const {