2 * Copyright 2015 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
19 #include <folly/wangle/rx/types.h> // must come first
20 #include <folly/wangle/rx/Subject.h>
21 #include <folly/wangle/rx/Subscription.h>
23 #include <folly/RWSpinLock.h>
24 #include <folly/SmallLocks.h>
25 #include <folly/ThreadLocal.h>
26 #include <folly/small_vector.h>
27 #include <folly/Executor.h>
28 #include <folly/Memory.h>
32 namespace folly { namespace wangle {
34 template <class T, size_t InlineObservers>
37 Observable() : nextSubscriptionId_{1} {}
39 // TODO perhaps we want to provide this #5283229
40 Observable(Observable&& other) = delete;
42 virtual ~Observable() {
44 unsubscriber_->disable();
48 // The next three methods subscribe the given Observer to this Observable.
50 // If these are called within an Observer callback, the new observer will not
51 // get the current update but will get subsequent updates.
53 // subscribe() returns a Subscription object. The observer will continue to
54 // get updates until the Subscription is destroyed.
56 // observe(ObserverPtr<T>) creates an indefinite subscription
58 // observe(Observer<T>*) also creates an indefinite subscription, but the
59 // caller is responsible for ensuring that the given Observer outlives this
60 // Observable. This might be useful in high performance environments where
61 // allocations must be kept to a minimum. Template parameter InlineObservers
62 // specifies how many observers can been subscribed inline without any
63 // allocations (it's just the size of a folly::small_vector).
64 virtual Subscription<T> subscribe(ObserverPtr<T> observer) {
65 return subscribeImpl(observer, false);
68 virtual void observe(ObserverPtr<T> observer) {
69 subscribeImpl(observer, true);
72 virtual void observe(Observer<T>* observer) {
73 if (inCallback_ && *inCallback_) {
75 newObservers_.reset(new ObserverList());
77 newObservers_->push_back(observer);
79 RWSpinLock::WriteHolder{&observersLock_};
80 observers_.push_back(observer);
84 // TODO unobserve(ObserverPtr<T>), unobserve(Observer<T>*)
86 /// Returns a new Observable that will call back on the given Scheduler.
87 /// The returned Observable must outlive the parent Observable.
89 // This and subscribeOn should maybe just be a first-class feature of an
90 // Observable, rather than making new ones whose lifetimes are tied to their
91 // parents. In that case it'd return a reference to this object for
93 ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
94 // you're right Hannes, if we have Observable::create we don't need this
96 struct ViaSubject : public Observable<T>
98 ViaSubject(SchedulerPtr sched,
100 : scheduler_(sched), observable_(obs)
103 Subscription<T> subscribe(ObserverPtr<T> o) override {
104 return observable_->subscribe(
106 [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
107 [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
108 [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
112 SchedulerPtr scheduler_;
113 Observable* observable_;
116 return std::make_shared<ViaSubject>(scheduler, this);
119 /// Returns a new Observable that will subscribe to this parent Observable
120 /// via the given Scheduler. This can be subtle and confusing at first, see
121 /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
122 std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
123 struct Subject_ : public Subject<T> {
125 Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
128 Subscription<T> subscribe(ObserverPtr<T> o) {
129 scheduler_->add([=] {
130 observable_->subscribe(o);
132 return Subscription<T>(nullptr, 0); // TODO
136 SchedulerPtr scheduler_;
137 Observable* observable_;
140 return folly::make_unique<Subject_>(scheduler, this);
144 // Safely execute an operation on each observer. F must take a single
145 // Observer<T>* as its argument.
147 void forEachObserver(F f) {
148 if (UNLIKELY(!inCallback_)) {
149 inCallback_.reset(new bool{false});
151 CHECK(!(*inCallback_));
155 RWSpinLock::ReadHolder rh(observersLock_);
156 for (auto o : observers_) {
160 for (auto& kv : subscribers_) {
165 if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
166 (newSubscribers_ && !newSubscribers_->empty()) ||
167 (oldSubscribers_ && !oldSubscribers_->empty()))) {
169 RWSpinLock::WriteHolder wh(observersLock_);
171 for (auto observer : *(newObservers_)) {
172 observers_.push_back(observer);
174 newObservers_->clear();
176 if (newSubscribers_) {
177 for (auto& kv : *(newSubscribers_)) {
178 subscribers_.insert(std::move(kv));
180 newSubscribers_->clear();
182 if (oldSubscribers_) {
183 for (auto id : *(oldSubscribers_)) {
184 subscribers_.erase(id);
186 oldSubscribers_->clear();
190 *inCallback_ = false;
194 Subscription<T> subscribeImpl(ObserverPtr<T> observer, bool indefinite) {
195 auto subscription = makeSubscription(indefinite);
196 typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
197 if (inCallback_ && *inCallback_) {
198 if (!newSubscribers_) {
199 newSubscribers_.reset(new SubscriberMap());
201 newSubscribers_->insert(std::move(kv));
203 RWSpinLock::WriteHolder{&observersLock_};
204 subscribers_.insert(std::move(kv));
211 explicit Unsubscriber(Observable* observable) : observable_(observable) {
215 void unsubscribe(uint64_t id) {
217 RWSpinLock::ReadHolder guard(lock_);
219 observable_->unsubscribe(id);
224 RWSpinLock::WriteHolder guard(lock_);
225 observable_ = nullptr;
230 Observable* observable_;
233 std::shared_ptr<Unsubscriber> unsubscriber_{nullptr};
234 MicroSpinLock unsubscriberLock_{0};
236 friend class Subscription<T>;
238 void unsubscribe(uint64_t id) {
239 if (inCallback_ && *inCallback_) {
240 if (!oldSubscribers_) {
241 oldSubscribers_.reset(new std::vector<uint64_t>());
243 if (newSubscribers_) {
244 auto it = newSubscribers_->find(id);
245 if (it != newSubscribers_->end()) {
246 newSubscribers_->erase(it);
250 oldSubscribers_->push_back(id);
252 RWSpinLock::WriteHolder{&observersLock_};
253 subscribers_.erase(id);
257 Subscription<T> makeSubscription(bool indefinite) {
259 return Subscription<T>(nullptr, nextSubscriptionId_++);
261 if (!unsubscriber_) {
262 std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
263 if (!unsubscriber_) {
264 unsubscriber_ = std::make_shared<Unsubscriber>(this);
267 return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
271 std::atomic<uint64_t> nextSubscriptionId_;
272 RWSpinLock observersLock_;
273 folly::ThreadLocalPtr<bool> inCallback_;
275 typedef folly::small_vector<Observer<T>*, InlineObservers> ObserverList;
276 ObserverList observers_;
277 folly::ThreadLocalPtr<ObserverList> newObservers_;
279 typedef std::map<uint64_t, ObserverPtr<T>> SubscriberMap;
280 SubscriberMap subscribers_;
281 folly::ThreadLocalPtr<SubscriberMap> newSubscribers_;
282 folly::ThreadLocalPtr<std::vector<uint64_t>> oldSubscribers_;