thread safety for Observable::observers_
authorJames Sedgwick <jsedgwick@fb.com>
Tue, 23 Sep 2014 13:12:18 +0000 (06:12 -0700)
committerAnton Likhtarov <alikhtarov@fb.com>
Fri, 26 Sep 2014 22:21:51 +0000 (15:21 -0700)
Summary: this way we can subscribe to an observable and blast data through it simultaneously from different threads

Test Plan: not much... the one client of rx compiles

Reviewed By: davejwatson@fb.com

Subscribers: fugalh, njormrod, bmatheny

FB internal diff: D1560647

folly/experimental/wangle/rx/Observable.h
folly/experimental/wangle/rx/Subject.h
folly/experimental/wangle/rx/test/RxTest.cpp [new file with mode: 0644]

index 4769007f91095d7dd904d41e3a30fa7efeac1ec7..f85d52a252c42e39734169a0f7ad79981da6338a 100644 (file)
@@ -20,6 +20,8 @@
 #include "Subject.h"
 #include "Subscription.h"
 
+#include <folly/RWSpinLock.h>
+#include <folly/ThreadLocal.h>
 #include <folly/wangle/Executor.h>
 #include <list>
 #include <memory>
 namespace folly { namespace wangle {
 
 template <class T>
-struct Observable {
+class Observable {
+ public:
+  Observable() = default;
+  Observable(Observable&& other) noexcept {
+    RWSpinLock::WriteHolder{&other.observersLock_};
+    observers_ = std::move(other.observers_);
+  }
+
   virtual ~Observable() = default;
 
   /// Subscribe the given Observer to this Observable.
   // Eventually this will return a Subscription object of some kind, in order
   // to support cancellation. This is kinda really important. Maybe I should
   // just do it now, using an dummy Subscription object.
+  //
+  // If this is called within an Observer callback, the new observer will not
+  // get the current update but will get subsequent updates.
   virtual Subscription subscribe(ObserverPtr<T> o) {
-    observers_.push_back(o);
+    if (inCallback_ && *inCallback_) {
+      if (!newObservers_) {
+        newObservers_.reset(new std::list<ObserverPtr<T>>());
+      }
+      newObservers_->push_back(o);
+    } else {
+      RWSpinLock::WriteHolder{&observersLock_};
+      observers_.push_back(o);
+    }
     return Subscription();
   }
 
@@ -97,7 +117,48 @@ struct Observable {
   }
 
  protected:
+  const std::list<ObserverPtr<T>>& getObservers() {
+    return observers_;
+  }
+
+  // This guard manages deferred modification of the observers list.
+  // Subclasses should use this guard if they want to subscribe new observers
+  // in the course of a callback. New observers won't be added until the guard
+  // goes out of scope. See Subject for an example.
+  class ObserversGuard {
+   public:
+    explicit ObserversGuard(Observable* o) : o_(o) {
+      if (UNLIKELY(!o_->inCallback_)) {
+        o_->inCallback_.reset(new bool{false});
+      }
+      CHECK(!(*o_->inCallback_));
+      *o_->inCallback_ = true;
+      o_->observersLock_.lock_shared();
+    }
+
+    ~ObserversGuard() {
+      o_->observersLock_.unlock_shared();
+      if (UNLIKELY(o_->newObservers_ && !o_->newObservers_->empty())) {
+        {
+          RWSpinLock::WriteHolder(o_->observersLock_);
+          for (auto& o : *(o_->newObservers_)) {
+            o_->observers_.push_back(o);
+          }
+        }
+        o_->newObservers_->clear();
+      }
+      *o_->inCallback_ = false;
+    }
+
+   private:
+    Observable* o_;
+  };
+
+ private:
   std::list<ObserverPtr<T>> observers_;
+  RWSpinLock observersLock_;
+  folly::ThreadLocalPtr<bool> inCallback_;
+  folly::ThreadLocalPtr<std::list<ObserverPtr<T>>> newObservers_;
 };
 
 }}
index 41b59c4e8182d74738836db11dd1e19d7f21dbb1..7d4c7cb8da46464cfaa900171dc2fed80e78beb1 100644 (file)
@@ -25,17 +25,24 @@ namespace folly { namespace wangle {
 /// observed events to the Subject's observers.
 template <class T>
 struct Subject : public Observable<T>, public Observer<T> {
+  typedef typename Observable<T>::ObserversGuard ObserversGuard;
   void onNext(T val) override {
-    for (auto& o : this->observers_)
+    ObserversGuard guard(this);
+    for (auto& o : Observable<T>::getObservers()) {
       o->onNext(val);
+    }
   }
   void onError(Error e) override {
-    for (auto& o : this->observers_)
+    ObserversGuard guard(this);
+    for (auto& o : Observable<T>::getObservers()) {
       o->onError(e);
+    }
   }
   void onCompleted() override {
-    for (auto& o : this->observers_)
+    ObserversGuard guard(this);
+    for (auto& o : Observable<T>::getObservers()) {
       o->onCompleted();
+    }
   }
 };
 
diff --git a/folly/experimental/wangle/rx/test/RxTest.cpp b/folly/experimental/wangle/rx/test/RxTest.cpp
new file mode 100644 (file)
index 0000000..cf4d9dd
--- /dev/null
@@ -0,0 +1,39 @@
+/*
+ * Copyright 2014 Facebook, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <folly/experimental/wangle/rx/Observer.h>
+#include <folly/experimental/wangle/rx/Subject.h>
+#include <gtest/gtest.h>
+
+using namespace folly::wangle;
+
+TEST(RxTest, SubscribeDuringCallback) {
+  // A subscriber who was subscribed in the course of a callback should get
+  // subsequent updates but not the current update.
+  Subject<int> subject;
+  int outerCount = 0;
+  int innerCount = 0;
+  subject.subscribe(Observer<int>::create([&] (int x) {
+    outerCount++;
+    subject.subscribe(Observer<int>::create([&] (int y) {
+      innerCount++;
+    }));
+  }));
+  subject.onNext(42);
+  subject.onNext(0xDEADBEEF);
+  EXPECT_EQ(2, outerCount);
+  EXPECT_EQ(1, innerCount);
+}