Copyright 2014->2015
[folly.git] / folly / wangle / rx / Observable.h
1 /*
2  * Copyright 2015 Facebook, Inc.
3  *
4  * Licensed under the Apache License, Version 2.0 (the "License");
5  * you may not use this file except in compliance with the License.
6  * You may obtain a copy of the License at
7  *
8  *   http://www.apache.org/licenses/LICENSE-2.0
9  *
10  * Unless required by applicable law or agreed to in writing, software
11  * distributed under the License is distributed on an "AS IS" BASIS,
12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  * See the License for the specific language governing permissions and
14  * limitations under the License.
15  */
16
17 #pragma once
18
19 #include <folly/wangle/rx/Subject.h>
20 #include <folly/wangle/rx/Subscription.h>
21 #include <folly/wangle/rx/types.h>
22
23 #include <folly/RWSpinLock.h>
24 #include <folly/SmallLocks.h>
25 #include <folly/ThreadLocal.h>
26 #include <folly/small_vector.h>
27 #include <folly/Executor.h>
28 #include <map>
29 #include <memory>
30
31 namespace folly { namespace wangle {
32
33 template <class T, size_t InlineObservers>
34 class Observable {
35  public:
36   Observable() : nextSubscriptionId_{1} {}
37
38   // TODO perhaps we want to provide this #5283229
39   Observable(Observable&& other) = delete;
40
41   virtual ~Observable() {
42     if (unsubscriber_) {
43       unsubscriber_->disable();
44     }
45   }
46
47   // The next three methods subscribe the given Observer to this Observable.
48   //
49   // If these are called within an Observer callback, the new observer will not
50   // get the current update but will get subsequent updates.
51   //
52   // subscribe() returns a Subscription object. The observer will continue to
53   // get updates until the Subscription is destroyed.
54   //
55   // observe(ObserverPtr<T>) creates an indefinite subscription
56   //
57   // observe(Observer<T>*) also creates an indefinite subscription, but the
58   // caller is responsible for ensuring that the given Observer outlives this
59   // Observable. This might be useful in high performance environments where
60   // allocations must be kept to a minimum. Template parameter InlineObservers
61   // specifies how many observers can been subscribed inline without any
62   // allocations (it's just the size of a folly::small_vector).
63   virtual Subscription<T> subscribe(ObserverPtr<T> observer) {
64     return subscribeImpl(observer, false);
65   }
66
67   virtual void observe(ObserverPtr<T> observer) {
68     subscribeImpl(observer, true);
69   }
70
71   virtual void observe(Observer<T>* observer) {
72     if (inCallback_ && *inCallback_) {
73       if (!newObservers_) {
74         newObservers_.reset(new ObserverList());
75       }
76       newObservers_->push_back(observer);
77     } else {
78       RWSpinLock::WriteHolder{&observersLock_};
79       observers_.push_back(observer);
80     }
81   }
82
83   // TODO unobserve(ObserverPtr<T>), unobserve(Observer<T>*)
84
85   /// Returns a new Observable that will call back on the given Scheduler.
86   /// The returned Observable must outlive the parent Observable.
87
88   // This and subscribeOn should maybe just be a first-class feature of an
89   // Observable, rather than making new ones whose lifetimes are tied to their
90   // parents. In that case it'd return a reference to this object for
91   // chaining.
92   ObservablePtr<T> observeOn(SchedulerPtr scheduler) {
93     // you're right Hannes, if we have Observable::create we don't need this
94     // helper class.
95     struct ViaSubject : public Observable<T>
96     {
97       ViaSubject(SchedulerPtr sched,
98                  Observable* obs)
99         : scheduler_(sched), observable_(obs)
100       {}
101
102       Subscription<T> subscribe(ObserverPtr<T> o) override {
103         return observable_->subscribe(
104           Observer<T>::create(
105             [=](T val) { scheduler_->add([o, val] { o->onNext(val); }); },
106             [=](Error e) { scheduler_->add([o, e] { o->onError(e); }); },
107             [=]() { scheduler_->add([o] { o->onCompleted(); }); }));
108       }
109
110      protected:
111       SchedulerPtr scheduler_;
112       Observable* observable_;
113     };
114
115     return std::make_shared<ViaSubject>(scheduler, this);
116   }
117
118   /// Returns a new Observable that will subscribe to this parent Observable
119   /// via the given Scheduler. This can be subtle and confusing at first, see
120   /// http://www.introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#SubscribeOnObserveOn
121   std::unique_ptr<Observable> subscribeOn(SchedulerPtr scheduler) {
122     struct Subject_ : public Subject<T> {
123      public:
124       Subject_(SchedulerPtr s, Observable* o) : scheduler_(s), observable_(o) {
125       }
126
127       Subscription<T> subscribe(ObserverPtr<T> o) {
128         scheduler_->add([=] {
129           observable_->subscribe(o);
130         });
131         return Subscription<T>(nullptr, 0); // TODO
132       }
133
134      protected:
135       SchedulerPtr scheduler_;
136       Observable* observable_;
137     };
138
139     return folly::make_unique<Subject_>(scheduler, this);
140   }
141
142  protected:
143   // Safely execute an operation on each observer. F must take a single
144   // Observer<T>* as its argument.
145   template <class F>
146   void forEachObserver(F f) {
147     if (UNLIKELY(!inCallback_)) {
148       inCallback_.reset(new bool{false});
149     }
150     CHECK(!(*inCallback_));
151     *inCallback_ = true;
152
153     {
154       RWSpinLock::ReadHolder rh(observersLock_);
155       for (auto o : observers_) {
156         f(o);
157       }
158
159       for (auto& kv : subscribers_) {
160         f(kv.second.get());
161       }
162     }
163
164     if (UNLIKELY((newObservers_ && !newObservers_->empty()) ||
165                  (newSubscribers_ && !newSubscribers_->empty()) ||
166                  (oldSubscribers_ && !oldSubscribers_->empty()))) {
167       {
168         RWSpinLock::WriteHolder wh(observersLock_);
169         if (newObservers_) {
170           for (auto observer : *(newObservers_)) {
171             observers_.push_back(observer);
172           }
173           newObservers_->clear();
174         }
175         if (newSubscribers_) {
176           for (auto& kv : *(newSubscribers_)) {
177             subscribers_.insert(std::move(kv));
178           }
179           newSubscribers_->clear();
180         }
181         if (oldSubscribers_) {
182           for (auto id : *(oldSubscribers_)) {
183             subscribers_.erase(id);
184           }
185           oldSubscribers_->clear();
186         }
187       }
188     }
189     *inCallback_ = false;
190   }
191
192  private:
193   Subscription<T> subscribeImpl(ObserverPtr<T> observer, bool indefinite) {
194     auto subscription = makeSubscription(indefinite);
195     typename SubscriberMap::value_type kv{subscription.id_, std::move(observer)};
196     if (inCallback_ && *inCallback_) {
197       if (!newSubscribers_) {
198         newSubscribers_.reset(new SubscriberMap());
199       }
200       newSubscribers_->insert(std::move(kv));
201     } else {
202       RWSpinLock::WriteHolder{&observersLock_};
203       subscribers_.insert(std::move(kv));
204     }
205     return subscription;
206   }
207
208   class Unsubscriber {
209    public:
210     explicit Unsubscriber(Observable* observable) : observable_(observable) {
211       CHECK(observable_);
212     }
213
214     void unsubscribe(uint64_t id) {
215       CHECK(id > 0);
216       RWSpinLock::ReadHolder guard(lock_);
217       if (observable_) {
218         observable_->unsubscribe(id);
219       }
220     }
221
222     void disable() {
223       RWSpinLock::WriteHolder guard(lock_);
224       observable_ = nullptr;
225     }
226
227    private:
228     RWSpinLock lock_;
229     Observable* observable_;
230   };
231
232   std::shared_ptr<Unsubscriber> unsubscriber_{nullptr};
233   MicroSpinLock unsubscriberLock_{0};
234
235   friend class Subscription<T>;
236
237   void unsubscribe(uint64_t id) {
238     if (inCallback_ && *inCallback_) {
239       if (!oldSubscribers_) {
240         oldSubscribers_.reset(new std::vector<uint64_t>());
241       }
242       if (newSubscribers_) {
243         auto it = newSubscribers_->find(id);
244         if (it != newSubscribers_->end()) {
245           newSubscribers_->erase(it);
246           return;
247         }
248       }
249       oldSubscribers_->push_back(id);
250     } else {
251       RWSpinLock::WriteHolder{&observersLock_};
252       subscribers_.erase(id);
253     }
254   }
255
256   Subscription<T> makeSubscription(bool indefinite) {
257     if (indefinite) {
258       return Subscription<T>(nullptr, nextSubscriptionId_++);
259     } else {
260       if (!unsubscriber_) {
261         std::lock_guard<MicroSpinLock> guard(unsubscriberLock_);
262         if (!unsubscriber_) {
263           unsubscriber_ = std::make_shared<Unsubscriber>(this);
264         }
265       }
266       return Subscription<T>(unsubscriber_, nextSubscriptionId_++);
267     }
268   }
269
270   std::atomic<uint64_t> nextSubscriptionId_;
271   RWSpinLock observersLock_;
272   folly::ThreadLocalPtr<bool> inCallback_;
273
274   typedef folly::small_vector<Observer<T>*, InlineObservers> ObserverList;
275   ObserverList observers_;
276   folly::ThreadLocalPtr<ObserverList> newObservers_;
277
278   typedef std::map<uint64_t, ObserverPtr<T>> SubscriberMap;
279   SubscriberMap subscribers_;
280   folly::ThreadLocalPtr<SubscriberMap> newSubscribers_;
281   folly::ThreadLocalPtr<std::vector<uint64_t>> oldSubscribers_;
282 };
283
284 }}