2 * Copyright 2014 Facebook, Inc.
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
8 * http://www.apache.org/licenses/LICENSE-2.0
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
21 #include "Subscription.h"
23 #include <folly/RWSpinLock.h>
24 #include <folly/ThreadLocal.h>
25 #include <folly/wangle/Executor.h>
29 namespace folly { namespace wangle {
34 Observable() = default;
35 Observable(Observable&& other) noexcept {
36 RWSpinLock::WriteHolder{&other.observersLock_};
37 observers_ = std::move(other.observers_);
40 virtual ~Observable() = default;
42 /// Subscribe the given Observer to this Observable.
43 // Eventually this will return a Subscription object of some kind, in order
44 // to support cancellation. This is kinda really important. Maybe I should
45 // just do it now, using an dummy Subscription object.
47 // If this is called within an Observer callback, the new observer will not
48 // get the current update but will get subsequent updates.
49 virtual Subscription subscribe(ObserverPtr<T> o) {
50 if (inCallback_ && *inCallback_) {
52 newObservers_.reset(new std::list<ObserverPtr<T>>());
54 newObservers_->push_back(o);
56 RWSpinLock::WriteHolder{&observersLock_};
57 observers_.push_back(o);
59 return Subscription();
62 /// Returns a new Observable that will call back on the given Scheduler.
63 /// The returned Observable must outlive the parent Observable.
65 // This and subscribeOn should maybe just be a first-class feature of an
66 // Observable, rather than making new ones whose lifetimes are tied to their
67 // parents. In that case it'd return a reference to this object for
69 ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
70 // you're right Hannes, if we have Observable::create we don't need this
72 struct ViaSubject : public Observable<T>
74 ViaSubject(SchedulerPtr scheduler,
76 : scheduler_(scheduler), observable_(obs)
79 Subscription subscribe(ObserverPtr<T> o) override {
80 return observable_->subscribe(
82 [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
83 [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
84 [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
88 SchedulerPtr scheduler_;
89 Observable* observable_;
92 return std::make_shared<ViaSubject>(scheduler, this);
95 /// Returns a new Observable that will subscribe to this parent Observable
96 /// via the given Scheduler. This can be subtle and confusing at first, see
97 /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
98 std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
99 struct Subject_ : public Subject<T> {
101 Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
104 Subscription subscribe(ObserverPtr<T> o) {
105 scheduler_->add([=] {
106 observable_->subscribe(o);
108 return Subscription();
112 SchedulerPtr scheduler_;
113 Observable* observable_;
116 return folly::make_unique<Subject_>(scheduler, this);
120 const std::list<ObserverPtr<T>>& getObservers() {
124 // This guard manages deferred modification of the observers list.
125 // Subclasses should use this guard if they want to subscribe new observers
126 // in the course of a callback. New observers won't be added until the guard
127 // goes out of scope. See Subject for an example.
128 class ObserversGuard {
130 explicit ObserversGuard(Observable* o) : o_(o) {
131 if (UNLIKELY(!o_->inCallback_)) {
132 o_->inCallback_.reset(new bool{false});
134 CHECK(!(*o_->inCallback_));
135 *o_->inCallback_ = true;
136 o_->observersLock_.lock_shared();
140 o_->observersLock_.unlock_shared();
141 if (UNLIKELY(o_->newObservers_ && !o_->newObservers_->empty())) {
143 RWSpinLock::WriteHolder(o_->observersLock_);
144 for (auto& o : *(o_->newObservers_)) {
145 o_->observers_.push_back(o);
148 o_->newObservers_->clear();
150 *o_->inCallback_ = false;
158 std::list<ObserverPtr<T>> observers_;
159 RWSpinLock observersLock_;
160 folly::ThreadLocalPtr<bool> inCallback_;
161 folly::ThreadLocalPtr<std::list<ObserverPtr<T>>> newObservers_;